diff --git a/mistral/tests/unit/workflow/test_reverse_workflow.py b/mistral/tests/unit/workflow/test_reverse_workflow.py index 02aebedff..cc2f491a9 100644 --- a/mistral/tests/unit/workflow/test_reverse_workflow.py +++ b/mistral/tests/unit/workflow/test_reverse_workflow.py @@ -15,6 +15,8 @@ # limitations under the License. from mistral.db.v2.sqlalchemy import models +# TODO(rakhmerov): Should the next two be in package 'workflow'? +from mistral.engine1 import base as eng_base from mistral.engine1 import states from mistral.openstack.common import log as logging from mistral.tests import base @@ -55,8 +57,22 @@ class ReverseWorkflowHandlerTest(base.BaseTest): }) self.exec_db = exec_db + self.wb_spec = wb_spec self.handler = r_wf.ReverseWorkflowHandler(exec_db) + def _create_db_task(self, id, name, state): + tasks_spec = self.wb_spec.get_workflows()['wf1'].get_tasks() + + task_db = models.Task() + task_db.update({ + 'id': id, + 'name': name, + 'spec': tasks_spec[name].to_dict(), + 'state': state + }) + + return task_db + def test_start_workflow(self): task_specs = self.handler.start_workflow(task_name='task2') @@ -64,9 +80,41 @@ class ReverseWorkflowHandlerTest(base.BaseTest): self.assertEqual('task1', task_specs[0].get_name()) self.assertEqual(states.RUNNING, self.exec_db.state) - def test_on_task_result_workflow(self): - # TODO(rakhmerov): Implement. - pass + def test_on_task_result(self): + self.exec_db.update({'state': states.RUNNING}) + + task1_db = self._create_db_task('1-1-1-1', 'task1', states.RUNNING) + task2_db = self._create_db_task('1-1-1-2', 'task2', states.IDLE) + + self.exec_db.tasks.append(task1_db) + self.exec_db.tasks.append(task2_db) + + # Emulate finishing 'task1'. + task_specs = self.handler.on_task_result( + task1_db, + eng_base.TaskResult(data='Hey') + ) + + self.assertEqual(1, len(task_specs)) + self.assertEqual('task2', task_specs[0].get_name()) + + self.assertEqual(states.RUNNING, self.exec_db.state) + self.assertEqual(states.SUCCESS, task1_db.state) + self.assertEqual(states.IDLE, task2_db.state) + + # Emulate finishing 'task2'. + task2_db.state = states.RUNNING + + task_specs = self.handler.on_task_result( + task2_db, + eng_base.TaskResult(data='Hi!') + ) + + self.assertEqual(0, len(task_specs)) + + self.assertEqual(states.SUCCESS, self.exec_db.state) + self.assertEqual(states.SUCCESS, task1_db.state) + self.assertEqual(states.SUCCESS, task2_db.state) def test_stop_workflow(self): # TODO(rakhmerov): Implement. diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index f246fe0e3..2d167d950 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -70,16 +70,7 @@ class WorkflowHandler(object): :return: Execution object. """ - state = self.exec_db.state - - if states.is_valid_transition(state, states.STOPPED): - self.exec_db.state = states.STOPPED - - LOG.info('Stopped workflow [execution=%s]' % self.exec_db) - else: - msg = "Can't change workflow state [execution=%s," \ - " state=%s -> %s]" % (self.exec_db, state, states.STOPPED) - raise exc.WorkflowException(msg) + self._set_execution_state(states.STOPPED) return self.exec_db @@ -88,21 +79,22 @@ class WorkflowHandler(object): :return: Tasks available to run. """ - state = self.exec_db.state - - if states.is_valid_transition(state, states.RUNNING): - self.exec_db.state = states.RUNNING - - LOG.info('Resumed workflow [execution=%s]' % self.exec_db) - else: - msg = "Can't change workflow state [execution=%s," \ - " state=%s -> %s]" % (self.exec_db, state, states.RUNNING) - raise exc.WorkflowException(msg) + self._set_execution_state(states.RUNNING) # TODO(rakhmerov): A concrete handler should also find tasks to run. return [] + def _set_execution_state(self, state): + cur_state = self.exec_db.state + + if states.is_valid_transition(cur_state, state): + self.exec_db.state = state + else: + msg = "Can't change workflow state [execution=%s," \ + " state=%s -> %s]" % (self.exec_db, cur_state, state) + raise exc.WorkflowException(msg) + class FlowControl(object): """Flow control structure. diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 4eec4caec..483c28325 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -45,35 +45,34 @@ class ReverseWorkflowHandler(base.WorkflowHandler): (self.wf_spec, task_name) raise exc.WorkflowException(msg) - task_specs = self._find_tasks_with_no_dependencies(task_spec) + task_specs = self._find_tasks_without_dependencies(task_spec) if len(task_specs) > 0: - state = self.exec_db.state - - if states.is_valid_transition(self.exec_db.state, states.RUNNING): - self.exec_db.state = states.RUNNING - else: - msg = "Can't change workflow state [execution=%s," \ - " state=%s -> %s]" % \ - (self.exec_db, state, states.RUNNING) - raise exc.WorkflowException(msg) + self._set_execution_state(states.RUNNING) return task_specs def on_task_result(self, task_db, task_result): task_db.state = \ states.ERROR if task_result.is_error() else states.SUCCESS - task_db.output = task_result.data + + # TODO(rakhmerov): Temporary hack. We need to use data flow here. + task_db.output = {'result': task_result.data} if task_db.state == states.ERROR: - # No need to check state transition since it's possible to switch - # to ERROR state from any other state. - self.exec_db.state = states.ERROR + # TODO(rakhmerov): Temporary hack, need to use policies. + self._set_execution_state(states.ERROR) + return [] - return self._find_resolved_tasks() + task_specs = self._find_resolved_tasks() - def _find_tasks_with_no_dependencies(self, task_spec): + if len(task_specs) == 0: + self._set_execution_state(states.SUCCESS) + + return task_specs + + def _find_tasks_without_dependencies(self, task_spec): """Given a target task name finds tasks with no dependencies. :param task_spec: Target task specification in the workflow graph @@ -126,5 +125,23 @@ class ReverseWorkflowHandler(base.WorkflowHandler): :return: Tasks with resolved dependencies. """ - # TODO(rakhmerov): Implement. - raise NotImplementedError + + tasks_db = self.exec_db.tasks + + # We need to analyse the graph and see which tasks are ready to start. + resolved_task_specs = [] + success_task_names = set() + + for t in tasks_db: + if t.state == states.SUCCESS: + success_task_names.add(t.name) + + for t in tasks_db: + t_spec = self.wf_spec.get_tasks()[t.name] + + if not (set(t_spec.get_requires()) - success_task_names): + # All required tasks, if any, are SUCCESS. + if t.state == states.IDLE: + resolved_task_specs.append(t_spec) + + return resolved_task_specs