Remove local engine

The local engine is replaced in process executor using the oslo.messaging
fake transport.  The local engine module is removed here along with related
unit tests. Some unit tests are rewritten to use the ScalableEngine. An
EngineTestCase is included in the base module of tests to simplify engine
testing.

Change-Id: I1a49a53eac87a209660c493fd7c3fe3e914fd092
Implements: blueprint mistral-inproc-executor
This commit is contained in:
Winson Chan 2014-04-04 10:20:02 -07:00
parent 7f8530aeda
commit a736f03bf8
8 changed files with 298 additions and 424 deletions

View File

@ -1,67 +0,0 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral import exceptions as exc
from mistral.engine import abstract_engine as abs_eng
from mistral.engine import states
from mistral.engine.actions import action_factory as a_f
from mistral.engine.actions import action_helper as a_h
from mistral.db import api as db_api
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class LocalEngine(abs_eng.AbstractEngine):
@classmethod
def _run_tasks(cls, tasks):
LOG.info("Workflow is running, tasks to run: %s" % tasks)
for t in tasks:
cls._run_task(t)
@classmethod
def _run_task(cls, task):
action = a_f.create_action(task)
LOG.info("Task is started - %s" % task['name'])
if a_h.is_task_synchronous(task):
try:
state, result = states.SUCCESS, action.run()
except exc.ActionException:
state, result = states.ERROR, None
cls.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
state, result)
else:
try:
action.run()
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.RUNNING})
except exc.ActionException:
cls.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
states.ERROR, None)
def get_engine():
return LocalEngine

View File

@ -28,6 +28,10 @@ from oslo.config import cfg
from oslo import messaging from oslo import messaging
from oslo.messaging import transport from oslo.messaging import transport
from mistral.engine import engine
from mistral.engine.scalable.executor import server
from mistral.engine.scalable import engine as concrete_engine
RESOURCES_PATH = 'tests/resources/' RESOURCES_PATH = 'tests/resources/'
@ -104,3 +108,22 @@ class DbTestCase(BaseTest):
def is_db_session_open(self): def is_db_session_open(self):
return db_api._get_thread_local_session() is not None return db_api._get_thread_local_session() is not None
class EngineTestCase(DbTestCase):
def __init__(self, *args, **kwargs):
super(EngineTestCase, self).__init__(*args, **kwargs)
self.transport = get_fake_transport()
engine.load_engine(self.transport)
self.engine = concrete_engine.get_engine()
self.engine.transport = self.transport
@classmethod
def mock_run_tasks(cls, tasks):
"""
Mock the engine _run_tasks to send requests directly to the task
executor instead of going through the oslo.messaging transport.
"""
executor = server.Executor()
for task in tasks:
executor.handle_task({}, task=task)

View File

@ -1,306 +0,0 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from mistral.db import api as db_api
from mistral.engine.actions import actions
from mistral.engine import expressions
from mistral.engine.local import engine
from mistral.engine import states
from mistral.openstack.common import importutils
from mistral.tests import base
importutils.import_module("mistral.config")
ENGINE = engine.get_engine()
WB_NAME = "my_workbook"
CONTEXT = None # TODO(rakhmerov): Use a meaningful value.
#TODO(rakhmerov): add more tests for errors, execution stop etc.
def print_tasks(tasks):
for t in tasks:
print t['name'], t['state']
class TestLocalEngine(base.DbTestCase):
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.RUNNING}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_one_task(self):
# Start workflow.
execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(1, len(tasks))
self._assert_single_item(tasks,
name='create-vms',
state=states.RUNNING)
# Make 'create-vms' task successful.
ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[0]['id'],
states.SUCCESS, None)
execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(1, len(tasks))
self._assert_single_item(tasks,
name='create-vms',
state=states.SUCCESS)
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.RUNNING}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_multiple_tasks(self):
# Start workflow.
execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
task = self._assert_single_item(tasks,
name='create-vms',
state=states.RUNNING)
# Make 'create-vms' task successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
task['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(2, len(tasks))
self._assert_single_item(tasks,
name='create-vms',
state=states.SUCCESS)
self._assert_single_item(tasks,
name='backup-vms',
state=states.RUNNING)
self.assertEqual(states.RUNNING,
ENGINE.get_workflow_execution_state(WB_NAME,
execution['id']))
task = self._assert_single_item(tasks, name='backup-vms')
# Make 'backup-vms' task successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
task['id'],
states.SUCCESS, None)
execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self._assert_single_item(tasks,
name='create-vms',
state=states.SUCCESS)
self._assert_single_item(tasks,
name='backup-vms',
state=states.SUCCESS)
self.assertEqual(states.SUCCESS,
ENGINE.get_workflow_execution_state(WB_NAME,
execution['id']))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(states, "get_state_by_http_status_code",
mock.MagicMock(return_value=states.SUCCESS))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_sync_task(self):
execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova",
CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_tasks_on_success_finish(self):
# Start workflow.
execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 1)
execution = db_api.execution_get(WB_NAME, execution['id'])
task = self._assert_single_item(tasks, name='test_subsequent')
# Make 'test_subsequent' task successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
task['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 4)
self._assert_single_item(tasks,
name='test_subsequent',
state=states.SUCCESS)
self._assert_single_item(tasks,
name='attach-volumes',
state=states.IDLE)
tasks2 = self._assert_multiple_items(tasks, 2,
name='create-vms',
state=states.RUNNING)
# Make 2 'create-vms' tasks successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks2[0]['id'],
states.SUCCESS, None)
ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks2[1]['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self._assert_multiple_items(tasks, 2,
name='create-vms',
state=states.SUCCESS)
task = self._assert_single_item(tasks,
name='attach-volumes',
state=states.RUNNING)
# Make 'attach-volumes' task successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
task['id'],
states.SUCCESS, None)
execution = db_api.execution_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self._assert_multiple_items(tasks, 4, state=states.SUCCESS)
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_tasks_on_error_finish(self):
# Start workflow.
execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id'])
# Make 'test_subsequent' task successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks[0]['id'],
states.ERROR, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 6)
self._assert_single_item(tasks,
name='backup-vms',
state=states.IDLE)
self._assert_single_item(tasks,
name='test_subsequent',
state=states.ERROR)
self._assert_single_item(tasks,
name='attach-volumes',
state=states.IDLE)
tasks2 = self._assert_multiple_items(tasks, 3,
name='create-vms',
state=states.RUNNING)
# Make 'create-vms' tasks successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks2[0]['id'],
states.SUCCESS, None)
ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks2[1]['id'],
states.SUCCESS, None)
ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks2[2]['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
task1 = self._assert_single_item(tasks,
name='backup-vms',
state=states.RUNNING)
task2 = self._assert_single_item(tasks,
name='attach-volumes',
state=states.RUNNING)
self._assert_multiple_items(tasks, 3,
name='create-vms',
state=states.SUCCESS)
# Make tasks 'backup-vms' and 'attach-volumes' successful.
ENGINE.convey_task_result(WB_NAME, execution['id'],
task1['id'],
states.SUCCESS, None)
ENGINE.convey_task_result(WB_NAME, execution['id'],
task2['id'],
states.SUCCESS, None)
execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self._assert_single_item(tasks, state=states.ERROR)
self._assert_multiple_items(tasks, 5, state=states.SUCCESS)

View File

@ -15,23 +15,33 @@
import mock import mock
from oslo.config import cfg
from mistral.openstack.common import log as logging
from mistral.openstack.common import importutils
from mistral.db import api as db_api from mistral.db import api as db_api
from mistral.engine.actions import actions from mistral.engine.actions import actions
from mistral.engine import expressions
from mistral.engine.scalable import engine from mistral.engine.scalable import engine
from mistral.engine import states from mistral.engine import states
from mistral.tests import base from mistral.tests import base
ENGINE = engine.get_engine() # We need to make sure that all configuration properties are registered.
importutils.import_module("mistral.config")
LOG = logging.getLogger(__name__)
WB_NAME = "my_workbook" WB_NAME = "my_workbook"
CONTEXT = None # TODO(rakhmerov): Use a meaningful value. CONTEXT = None # TODO(rakhmerov): Use a meaningful value.
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
#TODO(rakhmerov): add more tests for errors, execution stop etc. #TODO(rakhmerov): add more tests for errors, execution stop etc.
class TestScalableEngine(base.DbTestCase): class TestScalableEngine(base.EngineTestCase):
@mock.patch.object(engine.ScalableEngine, "_notify_task_executors", @mock.patch.object(engine.ScalableEngine, "_notify_task_executors",
mock.MagicMock(return_value="")) mock.MagicMock(return_value=""))
@mock.patch.object(db_api, "workbook_get", @mock.patch.object(db_api, "workbook_get",
@ -41,13 +51,13 @@ class TestScalableEngine(base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value="result")) mock.MagicMock(return_value="result"))
def test_engine_one_task(self): def test_engine_one_task(self):
execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms", execution = self.engine.start_workflow_execution(WB_NAME, "create-vms",
CONTEXT) CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0] task = db_api.tasks_get(WB_NAME, execution['id'])[0]
ENGINE.convey_task_result(WB_NAME, execution['id'], task['id'], self.engine.convey_task_result(WB_NAME, execution['id'], task['id'],
states.SUCCESS, None) states.SUCCESS, None)
task = db_api.tasks_get(WB_NAME, execution['id'])[0] task = db_api.tasks_get(WB_NAME, execution['id'])[0]
execution = db_api.execution_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id'])
@ -64,14 +74,14 @@ class TestScalableEngine(base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value="result")) mock.MagicMock(return_value="result"))
def test_engine_multiple_tasks(self): def test_engine_multiple_tasks(self):
execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms", execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms",
CONTEXT) CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
ENGINE.convey_task_result(WB_NAME, execution['id'], self.engine.convey_task_result(WB_NAME, execution['id'],
tasks[0]['id'], tasks[0]['id'],
states.SUCCESS, None) states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
@ -83,12 +93,12 @@ class TestScalableEngine(base.DbTestCase):
# for the second task. # for the second task.
self.assertEqual(tasks[1]['state'], states.IDLE) self.assertEqual(tasks[1]['state'], states.IDLE)
self.assertEqual(states.RUNNING, self.assertEqual(states.RUNNING,
ENGINE.get_workflow_execution_state(WB_NAME, self.engine.get_workflow_execution_state(
execution['id'])) WB_NAME, execution['id']))
ENGINE.convey_task_result(WB_NAME, execution['id'], self.engine.convey_task_result(WB_NAME, execution['id'],
tasks[1]['id'], tasks[1]['id'],
states.SUCCESS, None) states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id'])
@ -97,5 +107,185 @@ class TestScalableEngine(base.DbTestCase):
self.assertEqual(tasks[0]['state'], states.SUCCESS) self.assertEqual(tasks[0]['state'], states.SUCCESS)
self.assertEqual(tasks[1]['state'], states.SUCCESS) self.assertEqual(tasks[1]['state'], states.SUCCESS)
self.assertEqual(states.SUCCESS, self.assertEqual(states.SUCCESS,
ENGINE.get_workflow_execution_state(WB_NAME, self.engine.get_workflow_execution_state(
execution['id'])) WB_NAME, execution['id']))
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(states, "get_state_by_http_status_code",
mock.MagicMock(return_value=states.SUCCESS))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_sync_task(self):
execution = self.engine.start_workflow_execution(WB_NAME,
"create-vm-nova",
CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_tasks_on_success_finish(self):
# Start workflow.
execution = self.engine.start_workflow_execution(WB_NAME,
"test_subsequent",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 1)
execution = db_api.execution_get(WB_NAME, execution['id'])
task = self._assert_single_item(tasks, name='test_subsequent')
# Make 'test_subsequent' task successful.
self.engine.convey_task_result(WB_NAME, execution['id'],
task['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 4)
self._assert_single_item(tasks,
name='test_subsequent',
state=states.SUCCESS)
self._assert_single_item(tasks,
name='attach-volumes',
state=states.IDLE)
tasks2 = self._assert_multiple_items(tasks, 2,
name='create-vms',
state=states.RUNNING)
# Make 2 'create-vms' tasks successful.
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks2[0]['id'],
states.SUCCESS, None)
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks2[1]['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self._assert_multiple_items(tasks, 2,
name='create-vms',
state=states.SUCCESS)
task = self._assert_single_item(tasks,
name='attach-volumes',
state=states.RUNNING)
# Make 'attach-volumes' task successful.
self.engine.convey_task_result(WB_NAME, execution['id'],
task['id'],
states.SUCCESS, None)
execution = db_api.execution_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self._assert_multiple_items(tasks, 4, state=states.SUCCESS)
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_tasks_on_error_finish(self):
# Start workflow.
execution = self.engine.start_workflow_execution(WB_NAME,
"test_subsequent",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id'])
# Make 'test_subsequent' task successful.
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks[0]['id'],
states.ERROR, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 6)
self._assert_single_item(tasks,
name='backup-vms',
state=states.IDLE)
self._assert_single_item(tasks,
name='test_subsequent',
state=states.ERROR)
self._assert_single_item(tasks,
name='attach-volumes',
state=states.IDLE)
tasks2 = self._assert_multiple_items(tasks, 3,
name='create-vms',
state=states.RUNNING)
# Make 'create-vms' tasks successful.
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks2[0]['id'],
states.SUCCESS, None)
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks2[1]['id'],
states.SUCCESS, None)
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks2[2]['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
task1 = self._assert_single_item(tasks,
name='backup-vms',
state=states.RUNNING)
task2 = self._assert_single_item(tasks,
name='attach-volumes',
state=states.RUNNING)
self._assert_multiple_items(tasks, 3,
name='create-vms',
state=states.SUCCESS)
# Make tasks 'backup-vms' and 'attach-volumes' successful.
self.engine.convey_task_result(WB_NAME, execution['id'],
task1['id'],
states.SUCCESS, None)
self.engine.convey_task_result(WB_NAME, execution['id'],
task2['id'],
states.SUCCESS, None)
execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
self._assert_single_item(tasks, state=states.ERROR)
self._assert_multiple_items(tasks, 5, state=states.SUCCESS)

View File

@ -22,8 +22,8 @@ from mistral.openstack.common import log as logging
from mistral.openstack.common import importutils from mistral.openstack.common import importutils
from mistral.tests import base from mistral.tests import base
from mistral.db import api as db_api from mistral.db import api as db_api
from mistral.engine.scalable import engine
from mistral.engine.actions import actions from mistral.engine.actions import actions
from mistral.engine.local import engine
from mistral.engine import states from mistral.engine import states
from mistral.utils.openstack import keystone from mistral.utils.openstack import keystone
@ -33,7 +33,6 @@ from mistral.utils.openstack import keystone
importutils.import_module("mistral.config") importutils.import_module("mistral.config")
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
ENGINE = engine.get_engine()
TOKEN = "123ab" TOKEN = "123ab"
USER_ID = "321ba" USER_ID = "321ba"
@ -49,7 +48,9 @@ CONTEXT = {
} }
} }
cfg.CONF.pecan.auth_enable = False # Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
def create_workbook(definition_path): def create_workbook(definition_path):
@ -59,13 +60,16 @@ def create_workbook(definition_path):
}) })
class DataFlowTest(base.DbTestCase): class DataFlowTest(base.EngineTestCase):
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_two_dependent_tasks(self): def test_two_dependent_tasks(self):
wb = create_workbook('data_flow/two_dependent_tasks.yaml') wb = create_workbook('data_flow/two_dependent_tasks.yaml')
execution = ENGINE.start_workflow_execution(wb['name'], execution = self.engine.start_workflow_execution(wb['name'],
'build_greeting', 'build_greeting',
CONTEXT) CONTEXT)
# We have to reread execution to get its latest version. # We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'], execution = db_api.execution_get(execution['workbook_name'],
@ -123,12 +127,15 @@ class DataFlowTest(base.DbTestCase):
self.assertDictEqual(CONTEXT, build_greeting_task['in_context']) self.assertDictEqual(CONTEXT, build_greeting_task['in_context'])
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_task_with_two_dependencies(self): def test_task_with_two_dependencies(self):
wb = create_workbook('data_flow/task_with_two_dependencies.yaml') wb = create_workbook('data_flow/task_with_two_dependencies.yaml')
execution = ENGINE.start_workflow_execution(wb['name'], execution = self.engine.start_workflow_execution(wb['name'],
'send_greeting', 'send_greeting',
CONTEXT) CONTEXT)
# We have to reread execution to get its latest version. # We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'], execution = db_api.execution_get(execution['workbook_name'],
@ -212,12 +219,15 @@ class DataFlowTest(base.DbTestCase):
}, },
send_greeting_task['output']) send_greeting_task['output'])
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_two_subsequent_tasks(self): def test_two_subsequent_tasks(self):
wb = create_workbook('data_flow/two_subsequent_tasks.yaml') wb = create_workbook('data_flow/two_subsequent_tasks.yaml')
execution = ENGINE.start_workflow_execution(wb['name'], execution = self.engine.start_workflow_execution(wb['name'],
'build_full_name', 'build_full_name',
CONTEXT) CONTEXT)
# We have to reread execution to get its latest version. # We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'], execution = db_api.execution_get(execution['workbook_name'],
@ -276,12 +286,15 @@ class DataFlowTest(base.DbTestCase):
self.assertDictEqual(CONTEXT, build_greeting_task['in_context']) self.assertDictEqual(CONTEXT, build_greeting_task['in_context'])
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_three_subsequent_tasks(self): def test_three_subsequent_tasks(self):
wb = create_workbook('data_flow/three_subsequent_tasks.yaml') wb = create_workbook('data_flow/three_subsequent_tasks.yaml')
execution = ENGINE.start_workflow_execution(wb['name'], execution = self.engine.start_workflow_execution(wb['name'],
'build_full_name', 'build_full_name',
CONTEXT) CONTEXT)
# We have to reread execution to get its latest version. # We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'], execution = db_api.execution_get(execution['workbook_name'],
@ -377,21 +390,24 @@ class DataFlowTest(base.DbTestCase):
mock.Mock( mock.Mock(
return_value=mock.MagicMock(user_id=USER_ID, return_value=mock.MagicMock(user_id=USER_ID,
auth_token=TOKEN))) auth_token=TOKEN)))
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_add_token_to_context(self): def test_add_token_to_context(self):
cfg.CONF.pecan.auth_enable = True cfg.CONF.pecan.auth_enable = True
task_name = "create-vms" task_name = "create-vms"
workbook = create_workbook("test_rest.yaml") workbook = create_workbook("test_rest.yaml")
db_api.workbook_update(workbook['name'], {'trust_id': '123'}) db_api.workbook_update(workbook['name'], {'trust_id': '123'})
execution = ENGINE.start_workflow_execution(workbook['name'], execution = self.engine.start_workflow_execution(workbook['name'],
task_name, {}) task_name, {})
tasks = db_api.tasks_get(workbook['name'], execution['id']) tasks = db_api.tasks_get(workbook['name'], execution['id'])
task = self._assert_single_item(tasks, name=task_name) task = self._assert_single_item(tasks, name=task_name)
context = task['in_context'] context = task['in_context']
self.assertIn("auth_token", context) self.assertIn("auth_token", context)
self.assertEqual(TOKEN, context['auth_token']) self.assertEqual(TOKEN, context['auth_token'])
self.assertEqual(USER_ID, context["user_id"]) self.assertEqual(USER_ID, context["user_id"])
ENGINE.convey_task_result(workbook['name'], execution['id'], self.engine.convey_task_result(workbook['name'], execution['id'],
task['id'], states.SUCCESS, {}) task['id'], states.SUCCESS, {})
execution = db_api.execution_get(workbook['name'], execution['id']) execution = db_api.execution_get(workbook['name'], execution['id'])
self.assertEqual(states.SUCCESS, execution['state']) self.assertEqual(states.SUCCESS, execution['state'])
cfg.CONF.pecan.auth_enable = False cfg.CONF.pecan.auth_enable = False

View File

@ -14,21 +14,27 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock
import eventlet import eventlet
from oslo.config import cfg from oslo.config import cfg
from mistral.db import api as db_api from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.engine.local import engine
from mistral import dsl_parser
from mistral.openstack.common import importutils from mistral.openstack.common import importutils
from mistral.tests import base
from mistral.db import api as db_api
from mistral.engine.scalable import engine
from mistral import dsl_parser
# We need to make sure that all configuration properties are registered. # We need to make sure that all configuration properties are registered.
importutils.import_module("mistral.config") importutils.import_module("mistral.config")
cfg.CONF.pecan.auth_enable = False
ENGINE = engine.get_engine() LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
def create_workbook(workbook_name, definition_path): def create_workbook(workbook_name, definition_path):
@ -38,11 +44,14 @@ def create_workbook(workbook_name, definition_path):
}) })
class RepeatTaskTest(base.DbTestCase): class RepeatTaskTest(base.EngineTestCase):
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_simple_repeat_task(self): def test_simple_repeat_task(self):
wb = create_workbook('wb_1', 'repeat_task/single_repeat_task.yaml') wb = create_workbook('wb_1', 'repeat_task/single_repeat_task.yaml')
execution = ENGINE.start_workflow_execution(wb['name'], execution = self.engine.start_workflow_execution(wb['name'],
'repeater_task', None) 'repeater_task', None)
wb_spec = dsl_parser.get_workbook(wb['definition']) wb_spec = dsl_parser.get_workbook(wb['definition'])
iterations, _, delay = wb_spec.tasks.get('repeater_task').\ iterations, _, delay = wb_spec.tasks.get('repeater_task').\
get_repeat_task_parameters() get_repeat_task_parameters()
@ -54,27 +63,36 @@ class RepeatTaskTest(base.DbTestCase):
self._assert_single_item(tasks, task_runtime_context={ self._assert_single_item(tasks, task_runtime_context={
"iteration_no": 2}) "iteration_no": 2})
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_no_repeat_task(self): def test_no_repeat_task(self):
wb = create_workbook('wb_2', 'repeat_task/no_repeat_task.yaml') wb = create_workbook('wb_2', 'repeat_task/no_repeat_task.yaml')
execution = ENGINE.start_workflow_execution(wb['name'], execution = self.engine.start_workflow_execution(wb['name'],
'repeater_task', None) 'repeater_task', None)
tasks = db_api.tasks_get(wb['name'], execution['id']) tasks = db_api.tasks_get(wb['name'], execution['id'])
self._assert_single_item(tasks, name='repeater_task') self._assert_single_item(tasks, name='repeater_task')
self._assert_single_item(tasks, task_runtime_context={ self._assert_single_item(tasks, task_runtime_context={
"iteration_no": -1}) "iteration_no": -1})
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_break_early_repeat_task(self): def test_break_early_repeat_task(self):
wb = create_workbook('wb_3', 'repeat_task/single_repeat_task.yaml') wb = create_workbook('wb_3', 'repeat_task/single_repeat_task.yaml')
execution = ENGINE.start_workflow_execution( execution = self.engine.start_workflow_execution(
wb['name'], 'repeater_task_break_early', None) wb['name'], 'repeater_task_break_early', None)
tasks = db_api.tasks_get(wb['name'], execution['id']) tasks = db_api.tasks_get(wb['name'], execution['id'])
self._assert_single_item(tasks, name='repeater_task_break_early') self._assert_single_item(tasks, name='repeater_task_break_early')
self._assert_single_item(tasks, task_runtime_context={ self._assert_single_item(tasks, task_runtime_context={
"iteration_no": 0}) "iteration_no": 0})
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_from_no_repeat_to_repeat_task(self): def test_from_no_repeat_to_repeat_task(self):
wb = create_workbook('wb_4', 'repeat_task/single_repeat_task.yaml') wb = create_workbook('wb_4', 'repeat_task/single_repeat_task.yaml')
execution = ENGINE.start_workflow_execution( execution = self.engine.start_workflow_execution(
wb['name'], 'not_repeat_task', None) wb['name'], 'not_repeat_task', None)
wb_spec = dsl_parser.get_workbook(wb['definition']) wb_spec = dsl_parser.get_workbook(wb['definition'])
iterations, _, delay = wb_spec.tasks.get('repeater_task').\ iterations, _, delay = wb_spec.tasks.get('repeater_task').\