Merge "Add error handling tests: invalid workflow input, error in first task"
This commit is contained in:
commit
0dac2b6d83
@ -15,6 +15,7 @@
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
@ -26,6 +27,68 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
|
||||
class ErrorHandlingEngineTest(base.EngineTestCase):
|
||||
def test_invalid_workflow_input(self):
|
||||
# Check that in case of invalid input workflow objects aren't even
|
||||
# created.
|
||||
wf_text = """
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
input:
|
||||
- param1
|
||||
- param2
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
self.assertRaises(
|
||||
exc.InputException,
|
||||
self.engine.start_workflow,
|
||||
'wf',
|
||||
{'wrong_param': 'some_value'}
|
||||
)
|
||||
|
||||
self.assertEqual(0, len(db_api.get_workflow_executions()))
|
||||
self.assertEqual(0, len(db_api.get_task_executions()))
|
||||
self.assertEqual(0, len(db_api.get_action_executions()))
|
||||
|
||||
def test_first_task_error(self):
|
||||
# Check that in case of an error in first task workflow objects are
|
||||
# still persisted properly.
|
||||
wf_text = """
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.fail
|
||||
on-success: task2
|
||||
|
||||
task2:
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
self.assertIsNotNone(db_api.get_workflow_execution(wf_ex.id))
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(1, len(task_execs))
|
||||
|
||||
self._assert_single_item(task_execs, name='task1', state=states.ERROR)
|
||||
|
||||
def test_action_error(self):
|
||||
# Check that state of all workflow objects (workflow executions,
|
||||
# task executions, action executions) is properly persisted in case
|
||||
|
Loading…
x
Reference in New Issue
Block a user