Initial implementation of action-based engine

This is first in series of changes that provides separation of flows into
engines and pattens. Patterns define structure, while engine define how
the task should be run according to said structure.

This change adds the basic patterns and a very simple engine that
is able to run flow defined with patterns by converting it to recursive
structure of so-called 'actions'.

For simplicity and ease of review certain major features like
resumption, saving and passing on task results and notifications are
left out for farther changes.

Partially implements blueprint patterns-and-engines

Co-authored-by: Anastasia Karpinska <akarpinska at griddynamics.com>
Change-Id: I68515d8a5b30d5d047bf9beb67cc3e2111175190
This commit is contained in:
Ivan A. Melnikov 2013-08-20 12:25:43 +04:00 committed by Joshua Harlow
parent 1ae6b9776a
commit 57c4b1bdc2
13 changed files with 538 additions and 3 deletions

View File

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Blocks define *structure*
There are two categories of blocks:
- patterns, which provide convenient way to express basic flow
structure, like linear flow or parallel flow
- terminals, which run task or needed for housekeeping
"""
# Import most used blocks into this module namespace:
from taskflow.blocks.patterns import LinearFlow # noqa
from taskflow.blocks.patterns import ParallelFlow # noqa
from taskflow.blocks.task import Task # noqa

25
taskflow/blocks/base.py Normal file
View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Block(object):
"""Basic flow structure unit
From blocks the flow definition is build.
"""

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.blocks import base
class Pattern(base.Block):
"""Base class for patterns that can contain nested blocks
Patterns put child blocks into *structure*.
"""
def __init__(self):
super(Pattern, self).__init__()
self._children = []
@property
def children(self):
return self._children
def add(self, *children):
self._children.extend(children)
return self
class LinearFlow(Pattern):
"""Linear (sequential) pattern
Children of this pattern should be executed one after another,
in order. Every child implicitly depends on all the children
before it.
"""
class ParallelFlow(Pattern):
"""Parallel (unordered) pattern
Children of this pattern are independent, and thus can be
executed in any order or in parallel.
"""

47
taskflow/blocks/task.py Normal file
View File

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Terminal blocks that actually run code
"""
from taskflow.blocks import base
from taskflow.openstack.common import uuidutils
class Task(base.Block):
"""A block that wraps a single task
The task should be executed, and produced results should be saved.
"""
def __init__(self, task, uuid=None):
super(Task, self).__init__()
self._task = task
if uuid is None:
self._id = uuidutils.generate_uuid()
else:
self._id = str(uuid)
@property
def task(self):
return self._task
@property
def uuid(self):
return self._id

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Action(object):
"""Basic action class
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def execute(self, engine):
"""Run the action"""
@abc.abstractmethod
def revert(self, engine):
"""Undo all side effects of execute method"""

View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.engines.action_engine import seq_action
from taskflow.engines.action_engine import task_action
from taskflow import blocks
from taskflow import states
class ActionEngine(object):
"""Generic action-based engine
Converts the flow to recursive structure of actions.
"""
def __init__(self, flow, action_map):
self._action_map = action_map
self._root = self._to_action(flow)
def _to_action(self, pattern):
try:
factory = self._action_map[type(pattern)]
except KeyError:
raise ValueError('Action of unknown type: %s (type %s)'
% (pattern, type(pattern)))
return factory(pattern, self._to_action)
def run(self):
status = self._root.execute(self)
if status == states.FAILURE:
self._root.revert(self)
class SingleThreadedActionEngine(ActionEngine):
def __init__(self, flow):
ActionEngine.__init__(self, flow, {
blocks.Task: task_action.TaskAction,
blocks.LinearFlow: seq_action.SequentialAction,
blocks.ParallelFlow: seq_action.SequentialAction
})

View File

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.engines.action_engine import base_action as base
from taskflow import states
class SequentialAction(base.Action):
def __init__(self, pattern, to_action):
self._history = []
self._actions = [to_action(pat) for pat in pattern.children]
def execute(self, engine):
state = states.SUCCESS
for action in self._actions:
#TODO(imelnikov): save history to storage
self._history.append(action)
state = action.execute(engine)
if state != states.SUCCESS:
break
return state
def revert(self, engine):
while self._history:
action = self._history[-1]
action.revert(engine)
self._history.pop()

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.engines.action_engine import base_action as base
from taskflow import states
import sys
class TaskAction(base.Action):
def __init__(self, block, _to_action):
self._task = block.task
if isinstance(self._task, type):
self._task = self._task()
self.state = states.PENDING
def execute(self, engine):
# TODO(imelnikov): notifications
self.state = states.RUNNING
try:
# TODO(imelnikov): pass only necessary args to task
self._task.execute()
except Exception:
# TODO(imelnikov): save exception information
print sys.exc_info()
self.state = states.FAILURE
else:
self.state = states.SUCCESS
return self.state
def revert(self, engine):
if self.state == states.PENDING: # pragma: no cover
# NOTE(imelnikov): in all the other states, the task
# execution was at least attempted, so we should give
# task a chance for cleanup
return
try:
self._task.revert()
except Exception:
self.state = states.FAILURE
raise
else:
self.state = states.PENDING

View File

@ -20,7 +20,6 @@
import celery
import logging
LOG = logging.getLogger(__name__)

View File

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import blocks
from taskflow import task
from taskflow import test
from taskflow.engines.action_engine import engine as eng
class TestTask(task.Task):
def __init__(self, values, name):
super(TestTask, self).__init__(name)
self.values = values
def execute(self, **kwargs):
self.values.append(self.name)
return 5
def revert(self, **kwargs):
self.values.append(self.name + ' reverted')
class FailingTask(TestTask):
def execute(self, **kwargs):
raise RuntimeError('Woot!')
class NeverRunningTask(task.Task):
def execute(self, **kwargs):
assert False, 'This method should not be called'
def revert(self, **kwargs):
assert False, 'This method should not be called'
class NastyTask(task.Task):
def execute(self, **kwargs):
pass
def revert(self, **kwargs):
raise RuntimeError('Gotcha!')
class EngineTestBase(object):
def setUp(self):
super(EngineTestBase, self).setUp()
self.values = []
def _make_engine(self, _flow):
raise NotImplementedError()
class EngineTaskTest(EngineTestBase):
def test_run_task_as_flow(self):
flow = blocks.Task(TestTask(self.values, name='task1'))
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1'])
def test_invalid_block_raises(self):
value = 'i am string, not block, sorry'
flow = blocks.LinearFlow().add(value)
with self.assertRaises(ValueError) as err:
self._make_engine(flow)
self.assertIn(value, str(err.exception))
class EngineLinearFlowTest(EngineTestBase):
def test_sequential_flow_one_task(self):
flow = blocks.LinearFlow().add(
blocks.Task(TestTask(self.values, name='task1'))
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1'])
def test_sequential_flow_two_tasks(self):
flow = blocks.LinearFlow().add(
blocks.Task(TestTask(self.values, name='task1')),
blocks.Task(TestTask(self.values, name='task2'))
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2'])
def test_sequential_flow_nested_blocks(self):
flow = blocks.LinearFlow().add(
blocks.Task(TestTask(self.values, 'task1')),
blocks.LinearFlow().add(
blocks.Task(TestTask(self.values, 'task2'))
)
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['task1', 'task2'])
def test_revert_exception_is_reraised(self):
flow = blocks.LinearFlow().add(
blocks.Task(NastyTask),
blocks.Task(FailingTask(self.values, 'fail'))
)
engine = self._make_engine(flow)
with self.assertRaisesRegexp(RuntimeError, '^Gotcha'):
engine.run()
def test_revert_not_run_task_is_not_reverted(self):
flow = blocks.LinearFlow().add(
blocks.Task(FailingTask(self.values, 'fail')),
blocks.Task(NeverRunningTask)
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['fail reverted'])
def test_correctly_reverts_children(self):
flow = blocks.LinearFlow().add(
blocks.Task(TestTask(self.values, 'task1')),
blocks.LinearFlow().add(
blocks.Task(TestTask(self.values, 'task2')),
blocks.Task(FailingTask(self.values, 'fail'))
)
)
engine = self._make_engine(flow)
engine.run()
self.assertEquals(self.values, ['task1', 'task2',
'fail reverted',
'task2 reverted', 'task1 reverted'])
class SingleThreadedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
test.TestCase):
def _make_engine(self, flow):
return eng.SingleThreadedActionEngine(flow)

View File

@ -39,6 +39,6 @@ setenv = NOSE_WITH_COVERAGE=1
commands = {posargs}
[flake8]
ignore = H402,H302
ignore = H402
builtins = _
exclude = .venv,.tox,dist,doc,*openstack/common*,*egg,.git,build,tools