Fix repeatable task scheduling

* Fixed repeatable task scheduling. See bug description for
   more information.
 * Small fix in data flow test

 Fixes bug: #1301866

Change-Id: I40e653ac242df0f5deee8b72ac77a7c04e4fda2a
This commit is contained in:
Nikolay Mahotkin 2014-04-03 17:27:10 +04:00
parent 697f67d7e7
commit 6342ffb6ab
5 changed files with 48 additions and 10 deletions

View File

@ -55,7 +55,8 @@ class AbstractEngine(object):
workbook_name, execution['id']
)
tasks_to_start = workflow.find_resolved_tasks(tasks)
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
context = cls._add_token_to_context(
context, db_api.workbook_get(workbook_name))
data_flow.prepare_tasks(tasks_to_start, context)
@ -68,6 +69,9 @@ class AbstractEngine(object):
finally:
db_api.end_tx()
for task in delayed_tasks:
cls._schedule_run(workbook, task, context)
cls._run_tasks(tasks_to_start)
return execution
@ -105,7 +109,8 @@ class AbstractEngine(object):
LOG.info("Changed execution state: %s" % execution)
tasks_to_start = workflow.find_resolved_tasks(tasks)
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
outbound_context = cls._add_token_to_context(
outbound_context, db_api.workbook_get(workbook_name))
data_flow.prepare_tasks(tasks_to_start, outbound_context)
@ -121,7 +126,7 @@ class AbstractEngine(object):
if states.is_stopped_or_finished(execution["state"]):
return task
if task['state'] == states.DELAYED:
for task in delayed_tasks:
cls._schedule_run(workbook, task, outbound_context)
if tasks_to_start:

View File

@ -137,6 +137,7 @@ def _update_dependencies(tasks, graph):
def _get_resolved_tasks(tasks):
resolved_tasks = []
delayed_tasks = []
allows = []
for t in tasks:
if t['state'] == states.SUCCESS:
@ -147,4 +148,6 @@ def _get_resolved_tasks(tasks):
if len(set(deps) - allow_set) == 0:
if t['state'] == states.IDLE:
resolved_tasks.append(t)
return resolved_tasks
elif t['state'] == states.DELAYED:
delayed_tasks.append(t)
return resolved_tasks, delayed_tasks

View File

@ -10,8 +10,8 @@ Workflow:
tasks:
repeater_task:
repeat:
iterations: 5
delay: 0
iterations: 3
delay: 1
action: MyService:repeatable-action
publish:
greet_msg: greeting
@ -24,3 +24,9 @@ Workflow:
action: MyService:repeatable-action
publish:
greet_msg: greeting
not_repeat_task:
action: MyService:repeatable-action
publish:
greet_msg: greeting
on-success: repeater_task

View File

@ -49,7 +49,7 @@ CONTEXT = {
}
}
cfg.CONF.pecan.auth_token = False
cfg.CONF.pecan.auth_enable = False
def create_workbook(definition_path):

View File

@ -14,10 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
from oslo.config import cfg
from mistral.db import api as db_api
from mistral.tests import base
from mistral.engine.local import engine
from oslo.config import cfg
from mistral import dsl_parser
from mistral.openstack.common import importutils
# We need to make sure that all configuration properties are registered.
@ -35,15 +39,20 @@ def create_workbook(workbook_name, definition_path):
class RepeatTaskTest(base.DbTestCase):
def test_simple_repeat_task(self):
wb = create_workbook('wb_1', 'repeat_task/single_repeat_task.yaml')
execution = ENGINE.start_workflow_execution(wb['name'],
'repeater_task', None)
wb_spec = dsl_parser.get_workbook(wb['definition'])
iterations, _, delay = wb_spec.tasks.get('repeater_task').\
get_repeat_task_parameters()
# Wait until all iterations are finished
eventlet.sleep(delay * iterations + 1)
tasks = db_api.tasks_get(wb['name'], execution['id'])
self._assert_single_item(tasks, name='repeater_task')
self._assert_single_item(tasks, task_runtime_context={
"iteration_no": 4})
"iteration_no": 2})
def test_no_repeat_task(self):
wb = create_workbook('wb_2', 'repeat_task/no_repeat_task.yaml')
@ -62,3 +71,18 @@ class RepeatTaskTest(base.DbTestCase):
self._assert_single_item(tasks, name='repeater_task_break_early')
self._assert_single_item(tasks, task_runtime_context={
"iteration_no": 0})
def test_from_no_repeat_to_repeat_task(self):
wb = create_workbook('wb_4', 'repeat_task/single_repeat_task.yaml')
execution = ENGINE.start_workflow_execution(
wb['name'], 'not_repeat_task', None)
wb_spec = dsl_parser.get_workbook(wb['definition'])
iterations, _, delay = wb_spec.tasks.get('repeater_task').\
get_repeat_task_parameters()
# Wait until all iterations are finished
eventlet.sleep(delay * iterations + 1)
tasks = db_api.tasks_get(wb['name'], execution['id'])
self._assert_multiple_items(tasks, 2)
self._assert_single_item(tasks, name='repeater_task')
self._assert_single_item(tasks, task_runtime_context=None)