diff --git a/mistral/engine/local/__init__.py b/mistral/engine/local/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/mistral/engine/local/engine.py b/mistral/engine/local/engine.py deleted file mode 100644 index 8f8c8b4c5..000000000 --- a/mistral/engine/local/engine.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) 2013 Mirantis Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from mistral import exceptions as exc -from mistral.engine import abstract_engine as abs_eng -from mistral.engine import states -from mistral.engine.actions import action_factory as a_f -from mistral.engine.actions import action_helper as a_h -from mistral.db import api as db_api -from mistral.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - - -class LocalEngine(abs_eng.AbstractEngine): - @classmethod - def _run_tasks(cls, tasks): - LOG.info("Workflow is running, tasks to run: %s" % tasks) - for t in tasks: - cls._run_task(t) - - @classmethod - def _run_task(cls, task): - action = a_f.create_action(task) - - LOG.info("Task is started - %s" % task['name']) - - if a_h.is_task_synchronous(task): - try: - state, result = states.SUCCESS, action.run() - except exc.ActionException: - state, result = states.ERROR, None - - cls.convey_task_result(task['workbook_name'], - task['execution_id'], - task['id'], - state, result) - else: - try: - action.run() - - db_api.task_update(task['workbook_name'], - task['execution_id'], - task['id'], - {'state': states.RUNNING}) - except exc.ActionException: - cls.convey_task_result(task['workbook_name'], - task['execution_id'], - task['id'], - states.ERROR, None) - - -def get_engine(): - return LocalEngine diff --git a/mistral/tests/base.py b/mistral/tests/base.py index e0b1ffbae..14ba6a326 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -28,6 +28,10 @@ from oslo.config import cfg from oslo import messaging from oslo.messaging import transport +from mistral.engine import engine +from mistral.engine.scalable.executor import server +from mistral.engine.scalable import engine as concrete_engine + RESOURCES_PATH = 'tests/resources/' @@ -104,3 +108,22 @@ class DbTestCase(BaseTest): def is_db_session_open(self): return db_api._get_thread_local_session() is not None + + +class EngineTestCase(DbTestCase): + def __init__(self, *args, **kwargs): + super(EngineTestCase, self).__init__(*args, **kwargs) + self.transport = get_fake_transport() + engine.load_engine(self.transport) + self.engine = concrete_engine.get_engine() + self.engine.transport = self.transport + + @classmethod + def mock_run_tasks(cls, tasks): + """ + Mock the engine _run_tasks to send requests directly to the task + executor instead of going through the oslo.messaging transport. + """ + executor = server.Executor() + for task in tasks: + executor.handle_task({}, task=task) diff --git a/mistral/tests/unit/engine/local/__init__.py b/mistral/tests/unit/engine/local/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/mistral/tests/unit/engine/local/test_engine.py b/mistral/tests/unit/engine/local/test_engine.py deleted file mode 100644 index 9198cbe09..000000000 --- a/mistral/tests/unit/engine/local/test_engine.py +++ /dev/null @@ -1,306 +0,0 @@ -# Copyright (c) 2013 Mirantis Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import mock - -from mistral.db import api as db_api -from mistral.engine.actions import actions -from mistral.engine import expressions -from mistral.engine.local import engine -from mistral.engine import states -from mistral.openstack.common import importutils -from mistral.tests import base - - -importutils.import_module("mistral.config") -ENGINE = engine.get_engine() - -WB_NAME = "my_workbook" -CONTEXT = None # TODO(rakhmerov): Use a meaningful value. - -#TODO(rakhmerov): add more tests for errors, execution stop etc. - - -def print_tasks(tasks): - for t in tasks: - print t['name'], t['state'] - - -class TestLocalEngine(base.DbTestCase): - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value={'state': states.RUNNING})) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) - def test_engine_one_task(self): - # Start workflow. - execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms", - CONTEXT) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self.assertEqual(1, len(tasks)) - self._assert_single_item(tasks, - name='create-vms', - state=states.RUNNING) - - # Make 'create-vms' task successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[0]['id'], - states.SUCCESS, None) - - execution = db_api.execution_get(WB_NAME, execution['id']) - - self.assertEqual(execution['state'], states.SUCCESS) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self.assertEqual(1, len(tasks)) - self._assert_single_item(tasks, - name='create-vms', - state=states.SUCCESS) - - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value={'state': states.RUNNING})) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) - def test_engine_multiple_tasks(self): - # Start workflow. - execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms", - CONTEXT) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - task = self._assert_single_item(tasks, - name='create-vms', - state=states.RUNNING) - - # Make 'create-vms' task successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - task['id'], - states.SUCCESS, None) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self.assertEqual(2, len(tasks)) - - self._assert_single_item(tasks, - name='create-vms', - state=states.SUCCESS) - self._assert_single_item(tasks, - name='backup-vms', - state=states.RUNNING) - - self.assertEqual(states.RUNNING, - ENGINE.get_workflow_execution_state(WB_NAME, - execution['id'])) - - task = self._assert_single_item(tasks, name='backup-vms') - - # Make 'backup-vms' task successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - task['id'], - states.SUCCESS, None) - - execution = db_api.execution_get(WB_NAME, execution['id']) - - self.assertEqual(execution['state'], states.SUCCESS) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self._assert_single_item(tasks, - name='create-vms', - state=states.SUCCESS) - self._assert_single_item(tasks, - name='backup-vms', - state=states.SUCCESS) - - self.assertEqual(states.SUCCESS, - ENGINE.get_workflow_execution_state(WB_NAME, - execution['id'])) - - @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value={'state': states.SUCCESS})) - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(states, "get_state_by_http_status_code", - mock.MagicMock(return_value=states.SUCCESS)) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) - def test_engine_sync_task(self): - execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova", - CONTEXT) - - task = db_api.tasks_get(WB_NAME, execution['id'])[0] - execution = db_api.execution_get(WB_NAME, execution['id']) - - self.assertEqual(execution['state'], states.SUCCESS) - self.assertEqual(task['state'], states.SUCCESS) - - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value={'state': states.SUCCESS})) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) - def test_engine_tasks_on_success_finish(self): - # Start workflow. - execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent", - CONTEXT) - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self.assertEqual(len(tasks), 1) - - execution = db_api.execution_get(WB_NAME, execution['id']) - - task = self._assert_single_item(tasks, name='test_subsequent') - - # Make 'test_subsequent' task successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - task['id'], - states.SUCCESS, None) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self.assertEqual(len(tasks), 4) - - self._assert_single_item(tasks, - name='test_subsequent', - state=states.SUCCESS) - self._assert_single_item(tasks, - name='attach-volumes', - state=states.IDLE) - - tasks2 = self._assert_multiple_items(tasks, 2, - name='create-vms', - state=states.RUNNING) - - # Make 2 'create-vms' tasks successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks2[0]['id'], - states.SUCCESS, None) - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks2[1]['id'], - states.SUCCESS, None) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self._assert_multiple_items(tasks, 2, - name='create-vms', - state=states.SUCCESS) - task = self._assert_single_item(tasks, - name='attach-volumes', - state=states.RUNNING) - - # Make 'attach-volumes' task successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - task['id'], - states.SUCCESS, None) - - execution = db_api.execution_get(WB_NAME, execution['id']) - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self.assertEqual(execution['state'], states.SUCCESS) - self._assert_multiple_items(tasks, 4, state=states.SUCCESS) - - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value={'state': states.SUCCESS})) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) - def test_engine_tasks_on_error_finish(self): - # Start workflow. - execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent", - CONTEXT) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - execution = db_api.execution_get(WB_NAME, execution['id']) - - # Make 'test_subsequent' task successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks[0]['id'], - states.ERROR, None) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self.assertEqual(len(tasks), 6) - - self._assert_single_item(tasks, - name='backup-vms', - state=states.IDLE) - self._assert_single_item(tasks, - name='test_subsequent', - state=states.ERROR) - self._assert_single_item(tasks, - name='attach-volumes', - state=states.IDLE) - - tasks2 = self._assert_multiple_items(tasks, 3, - name='create-vms', - state=states.RUNNING) - - # Make 'create-vms' tasks successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks2[0]['id'], - states.SUCCESS, None) - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks2[1]['id'], - states.SUCCESS, None) - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks2[2]['id'], - states.SUCCESS, None) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - task1 = self._assert_single_item(tasks, - name='backup-vms', - state=states.RUNNING) - task2 = self._assert_single_item(tasks, - name='attach-volumes', - state=states.RUNNING) - - self._assert_multiple_items(tasks, 3, - name='create-vms', - state=states.SUCCESS) - - # Make tasks 'backup-vms' and 'attach-volumes' successful. - ENGINE.convey_task_result(WB_NAME, execution['id'], - task1['id'], - states.SUCCESS, None) - ENGINE.convey_task_result(WB_NAME, execution['id'], - task2['id'], - states.SUCCESS, None) - - execution = db_api.execution_get(WB_NAME, execution['id']) - - self.assertEqual(execution['state'], states.SUCCESS) - - tasks = db_api.tasks_get(WB_NAME, execution['id']) - - self._assert_single_item(tasks, state=states.ERROR) - self._assert_multiple_items(tasks, 5, state=states.SUCCESS) diff --git a/mistral/tests/unit/engine/scalable/test_engine.py b/mistral/tests/unit/engine/scalable/test_engine.py index ccbff4c77..2077967f9 100644 --- a/mistral/tests/unit/engine/scalable/test_engine.py +++ b/mistral/tests/unit/engine/scalable/test_engine.py @@ -15,23 +15,33 @@ import mock +from oslo.config import cfg + +from mistral.openstack.common import log as logging +from mistral.openstack.common import importutils from mistral.db import api as db_api from mistral.engine.actions import actions +from mistral.engine import expressions from mistral.engine.scalable import engine from mistral.engine import states from mistral.tests import base -ENGINE = engine.get_engine() +# We need to make sure that all configuration properties are registered. +importutils.import_module("mistral.config") +LOG = logging.getLogger(__name__) WB_NAME = "my_workbook" CONTEXT = None # TODO(rakhmerov): Use a meaningful value. +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') #TODO(rakhmerov): add more tests for errors, execution stop etc. -class TestScalableEngine(base.DbTestCase): +class TestScalableEngine(base.EngineTestCase): @mock.patch.object(engine.ScalableEngine, "_notify_task_executors", mock.MagicMock(return_value="")) @mock.patch.object(db_api, "workbook_get", @@ -41,13 +51,13 @@ class TestScalableEngine(base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value="result")) def test_engine_one_task(self): - execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms", - CONTEXT) + execution = self.engine.start_workflow_execution(WB_NAME, "create-vms", + CONTEXT) task = db_api.tasks_get(WB_NAME, execution['id'])[0] - ENGINE.convey_task_result(WB_NAME, execution['id'], task['id'], - states.SUCCESS, None) + self.engine.convey_task_result(WB_NAME, execution['id'], task['id'], + states.SUCCESS, None) task = db_api.tasks_get(WB_NAME, execution['id'])[0] execution = db_api.execution_get(WB_NAME, execution['id']) @@ -64,14 +74,14 @@ class TestScalableEngine(base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value="result")) def test_engine_multiple_tasks(self): - execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms", - CONTEXT) + execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms", + CONTEXT) tasks = db_api.tasks_get(WB_NAME, execution['id']) - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks[0]['id'], - states.SUCCESS, None) + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks[0]['id'], + states.SUCCESS, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) @@ -83,12 +93,12 @@ class TestScalableEngine(base.DbTestCase): # for the second task. self.assertEqual(tasks[1]['state'], states.IDLE) self.assertEqual(states.RUNNING, - ENGINE.get_workflow_execution_state(WB_NAME, - execution['id'])) + self.engine.get_workflow_execution_state( + WB_NAME, execution['id'])) - ENGINE.convey_task_result(WB_NAME, execution['id'], - tasks[1]['id'], - states.SUCCESS, None) + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks[1]['id'], + states.SUCCESS, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id']) @@ -97,5 +107,185 @@ class TestScalableEngine(base.DbTestCase): self.assertEqual(tasks[0]['state'], states.SUCCESS) self.assertEqual(tasks[1]['state'], states.SUCCESS) self.assertEqual(states.SUCCESS, - ENGINE.get_workflow_execution_state(WB_NAME, - execution['id'])) + self.engine.get_workflow_execution_state( + WB_NAME, execution['id'])) + + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) + @mock.patch.object(actions.RestAction, "run", + mock.MagicMock(return_value={'state': states.SUCCESS})) + @mock.patch.object(db_api, "workbook_get", + mock.MagicMock(return_value={ + 'definition': base.get_resource("test_rest.yaml") + })) + @mock.patch.object(states, "get_state_by_http_status_code", + mock.MagicMock(return_value=states.SUCCESS)) + @mock.patch.object(expressions, "evaluate", + mock.MagicMock(side_effect=lambda x, y: x)) + def test_engine_sync_task(self): + execution = self.engine.start_workflow_execution(WB_NAME, + "create-vm-nova", + CONTEXT) + + task = db_api.tasks_get(WB_NAME, execution['id'])[0] + execution = db_api.execution_get(WB_NAME, execution['id']) + + self.assertEqual(execution['state'], states.SUCCESS) + self.assertEqual(task['state'], states.SUCCESS) + + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) + @mock.patch.object(db_api, "workbook_get", + mock.MagicMock(return_value={ + 'definition': base.get_resource("test_rest.yaml") + })) + @mock.patch.object(actions.RestAction, "run", + mock.MagicMock(return_value={'state': states.SUCCESS})) + @mock.patch.object(expressions, "evaluate", + mock.MagicMock(side_effect=lambda x, y: x)) + def test_engine_tasks_on_success_finish(self): + # Start workflow. + execution = self.engine.start_workflow_execution(WB_NAME, + "test_subsequent", + CONTEXT) + tasks = db_api.tasks_get(WB_NAME, execution['id']) + + self.assertEqual(len(tasks), 1) + + execution = db_api.execution_get(WB_NAME, execution['id']) + + task = self._assert_single_item(tasks, name='test_subsequent') + + # Make 'test_subsequent' task successful. + self.engine.convey_task_result(WB_NAME, execution['id'], + task['id'], + states.SUCCESS, None) + + tasks = db_api.tasks_get(WB_NAME, execution['id']) + + self.assertEqual(len(tasks), 4) + + self._assert_single_item(tasks, + name='test_subsequent', + state=states.SUCCESS) + self._assert_single_item(tasks, + name='attach-volumes', + state=states.IDLE) + + tasks2 = self._assert_multiple_items(tasks, 2, + name='create-vms', + state=states.RUNNING) + + # Make 2 'create-vms' tasks successful. + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks2[0]['id'], + states.SUCCESS, None) + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks2[1]['id'], + states.SUCCESS, None) + + tasks = db_api.tasks_get(WB_NAME, execution['id']) + + self._assert_multiple_items(tasks, 2, + name='create-vms', + state=states.SUCCESS) + task = self._assert_single_item(tasks, + name='attach-volumes', + state=states.RUNNING) + + # Make 'attach-volumes' task successful. + self.engine.convey_task_result(WB_NAME, execution['id'], + task['id'], + states.SUCCESS, None) + + execution = db_api.execution_get(WB_NAME, execution['id']) + tasks = db_api.tasks_get(WB_NAME, execution['id']) + + self.assertEqual(execution['state'], states.SUCCESS) + self._assert_multiple_items(tasks, 4, state=states.SUCCESS) + + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) + @mock.patch.object(db_api, "workbook_get", + mock.MagicMock(return_value={ + 'definition': base.get_resource("test_rest.yaml") + })) + @mock.patch.object(actions.RestAction, "run", + mock.MagicMock(return_value={'state': states.SUCCESS})) + @mock.patch.object(expressions, "evaluate", + mock.MagicMock(side_effect=lambda x, y: x)) + def test_engine_tasks_on_error_finish(self): + # Start workflow. + execution = self.engine.start_workflow_execution(WB_NAME, + "test_subsequent", + CONTEXT) + + tasks = db_api.tasks_get(WB_NAME, execution['id']) + execution = db_api.execution_get(WB_NAME, execution['id']) + + # Make 'test_subsequent' task successful. + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks[0]['id'], + states.ERROR, None) + + tasks = db_api.tasks_get(WB_NAME, execution['id']) + + self.assertEqual(len(tasks), 6) + + self._assert_single_item(tasks, + name='backup-vms', + state=states.IDLE) + self._assert_single_item(tasks, + name='test_subsequent', + state=states.ERROR) + self._assert_single_item(tasks, + name='attach-volumes', + state=states.IDLE) + + tasks2 = self._assert_multiple_items(tasks, 3, + name='create-vms', + state=states.RUNNING) + + # Make 'create-vms' tasks successful. + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks2[0]['id'], + states.SUCCESS, None) + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks2[1]['id'], + states.SUCCESS, None) + self.engine.convey_task_result(WB_NAME, execution['id'], + tasks2[2]['id'], + states.SUCCESS, None) + + tasks = db_api.tasks_get(WB_NAME, execution['id']) + + task1 = self._assert_single_item(tasks, + name='backup-vms', + state=states.RUNNING) + task2 = self._assert_single_item(tasks, + name='attach-volumes', + state=states.RUNNING) + + self._assert_multiple_items(tasks, 3, + name='create-vms', + state=states.SUCCESS) + + # Make tasks 'backup-vms' and 'attach-volumes' successful. + self.engine.convey_task_result(WB_NAME, execution['id'], + task1['id'], + states.SUCCESS, None) + self.engine.convey_task_result(WB_NAME, execution['id'], + task2['id'], + states.SUCCESS, None) + + execution = db_api.execution_get(WB_NAME, execution['id']) + + self.assertEqual(execution['state'], states.SUCCESS) + + tasks = db_api.tasks_get(WB_NAME, execution['id']) + + self._assert_single_item(tasks, state=states.ERROR) + self._assert_multiple_items(tasks, 5, state=states.SUCCESS) diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py index 2b657c68a..fded879f0 100644 --- a/mistral/tests/unit/engine/test_data_flow.py +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -22,8 +22,8 @@ from mistral.openstack.common import log as logging from mistral.openstack.common import importutils from mistral.tests import base from mistral.db import api as db_api +from mistral.engine.scalable import engine from mistral.engine.actions import actions -from mistral.engine.local import engine from mistral.engine import states from mistral.utils.openstack import keystone @@ -33,7 +33,6 @@ from mistral.utils.openstack import keystone importutils.import_module("mistral.config") LOG = logging.getLogger(__name__) -ENGINE = engine.get_engine() TOKEN = "123ab" USER_ID = "321ba" @@ -49,7 +48,9 @@ CONTEXT = { } } -cfg.CONF.pecan.auth_enable = False +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') def create_workbook(definition_path): @@ -59,13 +60,16 @@ def create_workbook(definition_path): }) -class DataFlowTest(base.DbTestCase): +class DataFlowTest(base.EngineTestCase): + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_two_dependent_tasks(self): wb = create_workbook('data_flow/two_dependent_tasks.yaml') - execution = ENGINE.start_workflow_execution(wb['name'], - 'build_greeting', - CONTEXT) + execution = self.engine.start_workflow_execution(wb['name'], + 'build_greeting', + CONTEXT) # We have to reread execution to get its latest version. execution = db_api.execution_get(execution['workbook_name'], @@ -123,12 +127,15 @@ class DataFlowTest(base.DbTestCase): self.assertDictEqual(CONTEXT, build_greeting_task['in_context']) + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_task_with_two_dependencies(self): wb = create_workbook('data_flow/task_with_two_dependencies.yaml') - execution = ENGINE.start_workflow_execution(wb['name'], - 'send_greeting', - CONTEXT) + execution = self.engine.start_workflow_execution(wb['name'], + 'send_greeting', + CONTEXT) # We have to reread execution to get its latest version. execution = db_api.execution_get(execution['workbook_name'], @@ -212,12 +219,15 @@ class DataFlowTest(base.DbTestCase): }, send_greeting_task['output']) + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_two_subsequent_tasks(self): wb = create_workbook('data_flow/two_subsequent_tasks.yaml') - execution = ENGINE.start_workflow_execution(wb['name'], - 'build_full_name', - CONTEXT) + execution = self.engine.start_workflow_execution(wb['name'], + 'build_full_name', + CONTEXT) # We have to reread execution to get its latest version. execution = db_api.execution_get(execution['workbook_name'], @@ -276,12 +286,15 @@ class DataFlowTest(base.DbTestCase): self.assertDictEqual(CONTEXT, build_greeting_task['in_context']) + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_three_subsequent_tasks(self): wb = create_workbook('data_flow/three_subsequent_tasks.yaml') - execution = ENGINE.start_workflow_execution(wb['name'], - 'build_full_name', - CONTEXT) + execution = self.engine.start_workflow_execution(wb['name'], + 'build_full_name', + CONTEXT) # We have to reread execution to get its latest version. execution = db_api.execution_get(execution['workbook_name'], @@ -377,21 +390,24 @@ class DataFlowTest(base.DbTestCase): mock.Mock( return_value=mock.MagicMock(user_id=USER_ID, auth_token=TOKEN))) + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_add_token_to_context(self): cfg.CONF.pecan.auth_enable = True task_name = "create-vms" workbook = create_workbook("test_rest.yaml") db_api.workbook_update(workbook['name'], {'trust_id': '123'}) - execution = ENGINE.start_workflow_execution(workbook['name'], - task_name, {}) + execution = self.engine.start_workflow_execution(workbook['name'], + task_name, {}) tasks = db_api.tasks_get(workbook['name'], execution['id']) task = self._assert_single_item(tasks, name=task_name) context = task['in_context'] self.assertIn("auth_token", context) self.assertEqual(TOKEN, context['auth_token']) self.assertEqual(USER_ID, context["user_id"]) - ENGINE.convey_task_result(workbook['name'], execution['id'], - task['id'], states.SUCCESS, {}) + self.engine.convey_task_result(workbook['name'], execution['id'], + task['id'], states.SUCCESS, {}) execution = db_api.execution_get(workbook['name'], execution['id']) self.assertEqual(states.SUCCESS, execution['state']) cfg.CONF.pecan.auth_enable = False diff --git a/mistral/tests/unit/engine/test_repeat_task.py b/mistral/tests/unit/engine/test_repeat_task.py index b9e9084e2..0259d481d 100644 --- a/mistral/tests/unit/engine/test_repeat_task.py +++ b/mistral/tests/unit/engine/test_repeat_task.py @@ -14,21 +14,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock + import eventlet from oslo.config import cfg -from mistral.db import api as db_api -from mistral.tests import base -from mistral.engine.local import engine -from mistral import dsl_parser +from mistral.openstack.common import log as logging from mistral.openstack.common import importutils +from mistral.tests import base +from mistral.db import api as db_api +from mistral.engine.scalable import engine +from mistral import dsl_parser # We need to make sure that all configuration properties are registered. importutils.import_module("mistral.config") -cfg.CONF.pecan.auth_enable = False -ENGINE = engine.get_engine() +LOG = logging.getLogger(__name__) + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') def create_workbook(workbook_name, definition_path): @@ -38,11 +44,14 @@ def create_workbook(workbook_name, definition_path): }) -class RepeatTaskTest(base.DbTestCase): +class RepeatTaskTest(base.EngineTestCase): + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_simple_repeat_task(self): wb = create_workbook('wb_1', 'repeat_task/single_repeat_task.yaml') - execution = ENGINE.start_workflow_execution(wb['name'], - 'repeater_task', None) + execution = self.engine.start_workflow_execution(wb['name'], + 'repeater_task', None) wb_spec = dsl_parser.get_workbook(wb['definition']) iterations, _, delay = wb_spec.tasks.get('repeater_task').\ get_repeat_task_parameters() @@ -54,27 +63,36 @@ class RepeatTaskTest(base.DbTestCase): self._assert_single_item(tasks, task_runtime_context={ "iteration_no": 2}) + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_no_repeat_task(self): wb = create_workbook('wb_2', 'repeat_task/no_repeat_task.yaml') - execution = ENGINE.start_workflow_execution(wb['name'], - 'repeater_task', None) + execution = self.engine.start_workflow_execution(wb['name'], + 'repeater_task', None) tasks = db_api.tasks_get(wb['name'], execution['id']) self._assert_single_item(tasks, name='repeater_task') self._assert_single_item(tasks, task_runtime_context={ "iteration_no": -1}) + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_break_early_repeat_task(self): wb = create_workbook('wb_3', 'repeat_task/single_repeat_task.yaml') - execution = ENGINE.start_workflow_execution( + execution = self.engine.start_workflow_execution( wb['name'], 'repeater_task_break_early', None) tasks = db_api.tasks_get(wb['name'], execution['id']) self._assert_single_item(tasks, name='repeater_task_break_early') self._assert_single_item(tasks, task_runtime_context={ "iteration_no": 0}) + @mock.patch.object(engine.ScalableEngine, '_run_tasks', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_run_tasks)) def test_from_no_repeat_to_repeat_task(self): wb = create_workbook('wb_4', 'repeat_task/single_repeat_task.yaml') - execution = ENGINE.start_workflow_execution( + execution = self.engine.start_workflow_execution( wb['name'], 'not_repeat_task', None) wb_spec = dsl_parser.get_workbook(wb['definition']) iterations, _, delay = wb_spec.tasks.get('repeater_task').\