Working on reverse workflow: on_task_result()

* Implementing ReverseWorkflowHandler.on_task_result()
* Unit tests

TODO:
 * Integrate handler code with Data Flow to process raw task result

Change-Id: I1879127bfa1346c179a571b8bd98fa901a90ccd5
This commit is contained in:
Renat Akhmerov 2014-08-06 16:19:31 +07:00
parent 4158f3b28f
commit 2e14bb0ea9
3 changed files with 98 additions and 41 deletions

View File

@ -15,6 +15,8 @@
# limitations under the License. # limitations under the License.
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
# TODO(rakhmerov): Should the next two be in package 'workflow'?
from mistral.engine1 import base as eng_base
from mistral.engine1 import states from mistral.engine1 import states
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
from mistral.tests import base from mistral.tests import base
@ -55,8 +57,22 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
}) })
self.exec_db = exec_db self.exec_db = exec_db
self.wb_spec = wb_spec
self.handler = r_wf.ReverseWorkflowHandler(exec_db) self.handler = r_wf.ReverseWorkflowHandler(exec_db)
def _create_db_task(self, id, name, state):
tasks_spec = self.wb_spec.get_workflows()['wf1'].get_tasks()
task_db = models.Task()
task_db.update({
'id': id,
'name': name,
'spec': tasks_spec[name].to_dict(),
'state': state
})
return task_db
def test_start_workflow(self): def test_start_workflow(self):
task_specs = self.handler.start_workflow(task_name='task2') task_specs = self.handler.start_workflow(task_name='task2')
@ -64,9 +80,41 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
self.assertEqual('task1', task_specs[0].get_name()) self.assertEqual('task1', task_specs[0].get_name())
self.assertEqual(states.RUNNING, self.exec_db.state) self.assertEqual(states.RUNNING, self.exec_db.state)
def test_on_task_result_workflow(self): def test_on_task_result(self):
# TODO(rakhmerov): Implement. self.exec_db.update({'state': states.RUNNING})
pass
task1_db = self._create_db_task('1-1-1-1', 'task1', states.RUNNING)
task2_db = self._create_db_task('1-1-1-2', 'task2', states.IDLE)
self.exec_db.tasks.append(task1_db)
self.exec_db.tasks.append(task2_db)
# Emulate finishing 'task1'.
task_specs = self.handler.on_task_result(
task1_db,
eng_base.TaskResult(data='Hey')
)
self.assertEqual(1, len(task_specs))
self.assertEqual('task2', task_specs[0].get_name())
self.assertEqual(states.RUNNING, self.exec_db.state)
self.assertEqual(states.SUCCESS, task1_db.state)
self.assertEqual(states.IDLE, task2_db.state)
# Emulate finishing 'task2'.
task2_db.state = states.RUNNING
task_specs = self.handler.on_task_result(
task2_db,
eng_base.TaskResult(data='Hi!')
)
self.assertEqual(0, len(task_specs))
self.assertEqual(states.SUCCESS, self.exec_db.state)
self.assertEqual(states.SUCCESS, task1_db.state)
self.assertEqual(states.SUCCESS, task2_db.state)
def test_stop_workflow(self): def test_stop_workflow(self):
# TODO(rakhmerov): Implement. # TODO(rakhmerov): Implement.

View File

@ -70,16 +70,7 @@ class WorkflowHandler(object):
:return: Execution object. :return: Execution object.
""" """
state = self.exec_db.state self._set_execution_state(states.STOPPED)
if states.is_valid_transition(state, states.STOPPED):
self.exec_db.state = states.STOPPED
LOG.info('Stopped workflow [execution=%s]' % self.exec_db)
else:
msg = "Can't change workflow state [execution=%s," \
" state=%s -> %s]" % (self.exec_db, state, states.STOPPED)
raise exc.WorkflowException(msg)
return self.exec_db return self.exec_db
@ -88,21 +79,22 @@ class WorkflowHandler(object):
:return: Tasks available to run. :return: Tasks available to run.
""" """
state = self.exec_db.state self._set_execution_state(states.RUNNING)
if states.is_valid_transition(state, states.RUNNING):
self.exec_db.state = states.RUNNING
LOG.info('Resumed workflow [execution=%s]' % self.exec_db)
else:
msg = "Can't change workflow state [execution=%s," \
" state=%s -> %s]" % (self.exec_db, state, states.RUNNING)
raise exc.WorkflowException(msg)
# TODO(rakhmerov): A concrete handler should also find tasks to run. # TODO(rakhmerov): A concrete handler should also find tasks to run.
return [] return []
def _set_execution_state(self, state):
cur_state = self.exec_db.state
if states.is_valid_transition(cur_state, state):
self.exec_db.state = state
else:
msg = "Can't change workflow state [execution=%s," \
" state=%s -> %s]" % (self.exec_db, cur_state, state)
raise exc.WorkflowException(msg)
class FlowControl(object): class FlowControl(object):
"""Flow control structure. """Flow control structure.

View File

@ -45,35 +45,34 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
(self.wf_spec, task_name) (self.wf_spec, task_name)
raise exc.WorkflowException(msg) raise exc.WorkflowException(msg)
task_specs = self._find_tasks_with_no_dependencies(task_spec) task_specs = self._find_tasks_without_dependencies(task_spec)
if len(task_specs) > 0: if len(task_specs) > 0:
state = self.exec_db.state self._set_execution_state(states.RUNNING)
if states.is_valid_transition(self.exec_db.state, states.RUNNING):
self.exec_db.state = states.RUNNING
else:
msg = "Can't change workflow state [execution=%s," \
" state=%s -> %s]" % \
(self.exec_db, state, states.RUNNING)
raise exc.WorkflowException(msg)
return task_specs return task_specs
def on_task_result(self, task_db, task_result): def on_task_result(self, task_db, task_result):
task_db.state = \ task_db.state = \
states.ERROR if task_result.is_error() else states.SUCCESS states.ERROR if task_result.is_error() else states.SUCCESS
task_db.output = task_result.data
# TODO(rakhmerov): Temporary hack. We need to use data flow here.
task_db.output = {'result': task_result.data}
if task_db.state == states.ERROR: if task_db.state == states.ERROR:
# No need to check state transition since it's possible to switch # TODO(rakhmerov): Temporary hack, need to use policies.
# to ERROR state from any other state. self._set_execution_state(states.ERROR)
self.exec_db.state = states.ERROR
return [] return []
return self._find_resolved_tasks() task_specs = self._find_resolved_tasks()
def _find_tasks_with_no_dependencies(self, task_spec): if len(task_specs) == 0:
self._set_execution_state(states.SUCCESS)
return task_specs
def _find_tasks_without_dependencies(self, task_spec):
"""Given a target task name finds tasks with no dependencies. """Given a target task name finds tasks with no dependencies.
:param task_spec: Target task specification in the workflow graph :param task_spec: Target task specification in the workflow graph
@ -126,5 +125,23 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
:return: Tasks with resolved dependencies. :return: Tasks with resolved dependencies.
""" """
# TODO(rakhmerov): Implement.
raise NotImplementedError tasks_db = self.exec_db.tasks
# We need to analyse the graph and see which tasks are ready to start.
resolved_task_specs = []
success_task_names = set()
for t in tasks_db:
if t.state == states.SUCCESS:
success_task_names.add(t.name)
for t in tasks_db:
t_spec = self.wf_spec.get_tasks()[t.name]
if not (set(t_spec.get_requires()) - success_task_names):
# All required tasks, if any, are SUCCESS.
if t.state == states.IDLE:
resolved_task_specs.append(t_spec)
return resolved_task_specs