Refactor workflow controller and fix a bug in _fail_workflow()

* Method get_controller is moved out from WorkflowController class
  because it's not related with its functionality directly
* Fixed tests accordingly
* "not found" test has been removed because there's no way now to
  make "not found" exceptin get raised. In order to make it happen
  we need to have a new workflow specification class w/o corresponding
  WorkflowController implementation. So that exception is just left
  just to check ourselves when we're working on a new WorkflowController
  implementation.

Change-Id: I0330870e4382f01c4519b5c48e43ac50a08db338
This commit is contained in:
Renat Akhmerov 2016-03-23 14:17:31 +06:00
parent 3e9aa8b310
commit 9a1a157274
5 changed files with 104 additions and 92 deletions

View File

@ -73,10 +73,7 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ex = db_api.get_workflow_execution(wf_ex_id) wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_handler.set_execution_state(wf_ex, states.RUNNING) wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.WorkflowController.get_controller( wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
wf_ex,
wf_spec
)
self._dispatch_workflow_commands( self._dispatch_workflow_commands(
wf_ex, wf_ex,
@ -178,7 +175,7 @@ class DefaultEngine(base.Engine, coordination.Service):
if task_ex.state == states.RUNNING_DELAYED: if task_ex.state == states.RUNNING_DELAYED:
return return
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex, wf_spec) wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
# Calculate commands to process next. # Calculate commands to process next.
cmds = wf_ctrl.continue_workflow() cmds = wf_ctrl.continue_workflow()
@ -302,7 +299,7 @@ class DefaultEngine(base.Engine, coordination.Service):
set_upstream=True set_upstream=True
) )
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) wf_ctrl = wf_base.get_controller(wf_ex)
# Calculate commands to process next. # Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
@ -390,7 +387,7 @@ class DefaultEngine(base.Engine, coordination.Service):
@staticmethod @staticmethod
def _stop_workflow(wf_ex, state, message=None): def _stop_workflow(wf_ex, state, message=None):
if state == states.SUCCESS: if state == states.SUCCESS:
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) wf_ctrl = wf_base.get_controller(wf_ex)
final_context = {} final_context = {}
try: try:
@ -469,6 +466,7 @@ class DefaultEngine(base.Engine, coordination.Service):
task_handler.on_action_complete( task_handler.on_action_complete(
action_ex, action_ex,
spec_parser.get_workflow_spec(wf_ex.spec),
wf_utils.Result(error=err_msg) wf_utils.Result(error=err_msg)
) )

View File

@ -162,7 +162,7 @@ def on_action_complete(action_ex, wf_spec, result):
scheduled for execution. scheduled for execution.
:param action_ex: Action execution objects the result belongs to. :param action_ex: Action execution objects the result belongs to.
:param wf_spec: Worflow specification. :param wf_spec: Workflow specification.
:param result: Task action/workflow output wrapped into :param result: Task action/workflow output wrapped into
mistral.workflow.utils.Result instance. mistral.workflow.utils.Result instance.
:return List of engine commands that need to be performed. :return List of engine commands that need to be performed.

View File

@ -1,59 +0,0 @@
# Copyright 2015 - Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from mistral import exceptions
from mistral.tests.unit import base
from mistral.workflow import base as wf_base
from mistral.workflow import direct_workflow
from mistral.workflow import reverse_workflow
class WorkflowControllerTest(base.BaseTest):
def test_get_class_direct(self):
wf_handler_cls = wf_base.WorkflowController._get_class("direct")
self.assertIs(wf_handler_cls, direct_workflow.DirectWorkflowController)
def test_get_class_reverse(self):
wf_handler_cls = wf_base.WorkflowController._get_class("reverse")
self.assertIs(wf_handler_cls,
reverse_workflow.ReverseWorkflowController)
def test_get_class_notfound(self):
exc = self.assertRaises(
exceptions.NotFoundException,
wf_base.WorkflowController._get_class,
"invalid"
)
self.assertIn("Failed to find a workflow controller", str(exc))
@mock.patch("mistral.workbook.parser.get_workflow_spec")
@mock.patch("mistral.workflow.base.WorkflowController._get_class")
def test_get_handler(self, mock_get_class, mock_get_spec):
mock_wf_spec = mock.MagicMock()
mock_wf_spec.get_type.return_value = "direct"
mock_get_spec.return_value = mock_wf_spec
mock_handler_cls = mock.MagicMock()
mock_get_class.return_value = mock_handler_cls
wf_ex = {"spec": "spec"}
wf_base.WorkflowController.get_controller(wf_ex)
mock_get_spec.assert_called_once_with("spec")
mock_get_class.assert_called_once_with("direct")
mock_handler_cls.assert_called_once_with(wf_ex, mock_wf_spec)

View File

@ -0,0 +1,67 @@
# Copyright 2015 - Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.tests.unit import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import direct_workflow as direct_wf
from mistral.workflow import reverse_workflow as reverse_wf
from mistral.db.v2.sqlalchemy import models as db_models
DIRECT_WF = """
---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Hey"
"""
REVERSE_WF = """
---
version: '2.0'
wf:
type: reverse
tasks:
task1:
action: std.echo output="Hey"
"""
class WorkflowControllerTest(base.BaseTest):
def test_get_controller_direct(self):
wf_spec = spec_parser.get_workflow_list_spec_from_yaml(DIRECT_WF)[0]
wf_ex = db_models.WorkflowExecution(spec=wf_spec.to_dict())
self.assertIsInstance(
wf_base.get_controller(wf_ex, wf_spec),
direct_wf.DirectWorkflowController
)
def test_get_controller_reverse(self):
wf_spec = spec_parser.get_workflow_list_spec_from_yaml(REVERSE_WF)[0]
wf_ex = db_models.WorkflowExecution(spec=wf_spec.to_dict())
self.assertIsInstance(
wf_base.get_controller(wf_ex, wf_spec),
reverse_wf.ReverseWorkflowController
)

View File

@ -31,6 +31,35 @@ from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def get_controller(wf_ex, wf_spec=None):
"""Gets a workflow controller instance by given workflow execution object.
:param wf_ex: Workflow execution object.
:param wf_spec: Workflow specification object. If passed, the method works
faster.
:returns: Workflow controller class.
"""
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(wf_ex['spec'])
wf_type = wf_spec.get_type()
ctrl_cls = None
for cls in u.iter_subclasses(WorkflowController):
if cls.__workflow_type__ == wf_type:
ctrl_cls = cls
break
if not ctrl_cls:
raise exc.NotFoundException(
'Failed to find a workflow controller [type=%s]' % wf_type
)
return ctrl_cls(wf_ex, wf_spec)
class WorkflowController(object): class WorkflowController(object):
"""Workflow Controller base class. """Workflow Controller base class.
@ -48,8 +77,10 @@ class WorkflowController(object):
:param wf_spec: Workflow specification. :param wf_spec: Workflow specification.
""" """
self.wf_ex = wf_ex self.wf_ex = wf_ex
if wf_spec is None: if wf_spec is None:
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
self.wf_spec = wf_spec self.wf_spec = wf_spec
def _update_task_ex_env(self, task_ex, env): def _update_task_ex_env(self, task_ex, env):
@ -178,28 +209,3 @@ class WorkflowController(object):
def _is_paused_or_completed(self): def _is_paused_or_completed(self):
return states.is_paused_or_completed(self.wf_ex.state) return states.is_paused_or_completed(self.wf_ex.state)
@staticmethod
def _get_class(wf_type):
"""Gets a workflow controller class by given workflow type.
:param wf_type: Workflow type.
:returns: Workflow controller class.
"""
for wf_ctrl_cls in u.iter_subclasses(WorkflowController):
if wf_type == wf_ctrl_cls.__workflow_type__:
return wf_ctrl_cls
raise exc.NotFoundException(
'Failed to find a workflow controller [type=%s]' % wf_type
)
@staticmethod
def get_controller(wf_ex, wf_spec=None):
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(wf_ex['spec'])
return WorkflowController._get_class(wf_spec.get_type())(
wf_ex,
wf_spec
)