diff --git a/mistral/engine/abstract_engine.py b/mistral/engine/abstract_engine.py index 90fd714f6..59841637d 100644 --- a/mistral/engine/abstract_engine.py +++ b/mistral/engine/abstract_engine.py @@ -55,7 +55,8 @@ class AbstractEngine(object): workbook_name, execution['id'] ) - tasks_to_start = workflow.find_resolved_tasks(tasks) + tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) + context = cls._add_token_to_context( context, db_api.workbook_get(workbook_name)) data_flow.prepare_tasks(tasks_to_start, context) @@ -68,6 +69,9 @@ class AbstractEngine(object): finally: db_api.end_tx() + for task in delayed_tasks: + cls._schedule_run(workbook, task, context) + cls._run_tasks(tasks_to_start) return execution @@ -105,7 +109,8 @@ class AbstractEngine(object): LOG.info("Changed execution state: %s" % execution) - tasks_to_start = workflow.find_resolved_tasks(tasks) + tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) + outbound_context = cls._add_token_to_context( outbound_context, db_api.workbook_get(workbook_name)) data_flow.prepare_tasks(tasks_to_start, outbound_context) @@ -121,7 +126,7 @@ class AbstractEngine(object): if states.is_stopped_or_finished(execution["state"]): return task - if task['state'] == states.DELAYED: + for task in delayed_tasks: cls._schedule_run(workbook, task, outbound_context) if tasks_to_start: diff --git a/mistral/engine/workflow.py b/mistral/engine/workflow.py index 406800c1e..38366c8af 100644 --- a/mistral/engine/workflow.py +++ b/mistral/engine/workflow.py @@ -137,6 +137,7 @@ def _update_dependencies(tasks, graph): def _get_resolved_tasks(tasks): resolved_tasks = [] + delayed_tasks = [] allows = [] for t in tasks: if t['state'] == states.SUCCESS: @@ -147,4 +148,6 @@ def _get_resolved_tasks(tasks): if len(set(deps) - allow_set) == 0: if t['state'] == states.IDLE: resolved_tasks.append(t) - return resolved_tasks + elif t['state'] == states.DELAYED: + delayed_tasks.append(t) + return resolved_tasks, delayed_tasks diff --git a/mistral/tests/resources/repeat_task/single_repeat_task.yaml b/mistral/tests/resources/repeat_task/single_repeat_task.yaml index dbf32f016..9453da3a2 100644 --- a/mistral/tests/resources/repeat_task/single_repeat_task.yaml +++ b/mistral/tests/resources/repeat_task/single_repeat_task.yaml @@ -10,8 +10,8 @@ Workflow: tasks: repeater_task: repeat: - iterations: 5 - delay: 0 + iterations: 3 + delay: 1 action: MyService:repeatable-action publish: greet_msg: greeting @@ -24,3 +24,9 @@ Workflow: action: MyService:repeatable-action publish: greet_msg: greeting + + not_repeat_task: + action: MyService:repeatable-action + publish: + greet_msg: greeting + on-success: repeater_task diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py index 78081e593..2b657c68a 100644 --- a/mistral/tests/unit/engine/test_data_flow.py +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -49,7 +49,7 @@ CONTEXT = { } } -cfg.CONF.pecan.auth_token = False +cfg.CONF.pecan.auth_enable = False def create_workbook(definition_path): diff --git a/mistral/tests/unit/engine/test_repeat_task.py b/mistral/tests/unit/engine/test_repeat_task.py index 2f54974b0..b9e9084e2 100644 --- a/mistral/tests/unit/engine/test_repeat_task.py +++ b/mistral/tests/unit/engine/test_repeat_task.py @@ -14,10 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet + +from oslo.config import cfg + from mistral.db import api as db_api from mistral.tests import base from mistral.engine.local import engine -from oslo.config import cfg +from mistral import dsl_parser from mistral.openstack.common import importutils # We need to make sure that all configuration properties are registered. @@ -35,15 +39,20 @@ def create_workbook(workbook_name, definition_path): class RepeatTaskTest(base.DbTestCase): - def test_simple_repeat_task(self): wb = create_workbook('wb_1', 'repeat_task/single_repeat_task.yaml') execution = ENGINE.start_workflow_execution(wb['name'], 'repeater_task', None) + wb_spec = dsl_parser.get_workbook(wb['definition']) + iterations, _, delay = wb_spec.tasks.get('repeater_task').\ + get_repeat_task_parameters() + + # Wait until all iterations are finished + eventlet.sleep(delay * iterations + 1) tasks = db_api.tasks_get(wb['name'], execution['id']) self._assert_single_item(tasks, name='repeater_task') self._assert_single_item(tasks, task_runtime_context={ - "iteration_no": 4}) + "iteration_no": 2}) def test_no_repeat_task(self): wb = create_workbook('wb_2', 'repeat_task/no_repeat_task.yaml') @@ -62,3 +71,18 @@ class RepeatTaskTest(base.DbTestCase): self._assert_single_item(tasks, name='repeater_task_break_early') self._assert_single_item(tasks, task_runtime_context={ "iteration_no": 0}) + + def test_from_no_repeat_to_repeat_task(self): + wb = create_workbook('wb_4', 'repeat_task/single_repeat_task.yaml') + execution = ENGINE.start_workflow_execution( + wb['name'], 'not_repeat_task', None) + wb_spec = dsl_parser.get_workbook(wb['definition']) + iterations, _, delay = wb_spec.tasks.get('repeater_task').\ + get_repeat_task_parameters() + + # Wait until all iterations are finished + eventlet.sleep(delay * iterations + 1) + tasks = db_api.tasks_get(wb['name'], execution['id']) + self._assert_multiple_items(tasks, 2) + self._assert_single_item(tasks, name='repeater_task') + self._assert_single_item(tasks, task_runtime_context=None)