From 86c60dfa60496eaccf16dc50d13ac39f0cae451b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 13 Sep 2013 20:21:03 -0700 Subject: [PATCH] Use executors instead of pools A executor is a more generic concept than a pool and also seems to work better with eventlet. This is also a more supported model of performing concurrent operations than using the mostly unknown multiprocessing thread pool implementation. Fixes: bug 1225275 Change-Id: I09e9a9000bc88cc57d51342b83b31f97790a62e9 --- taskflow/engines/action_engine/engine.py | 52 +++++++++++-------- .../engines/action_engine/parallel_action.py | 4 +- taskflow/tests/unit/test_action_engine.py | 15 +++--- tools/pip-requires | 2 + 4 files changed, 43 insertions(+), 30 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 92ceb5a8c..018d0f909 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -16,9 +16,10 @@ # License for the specific language governing permissions and limitations # under the License. +import multiprocessing import threading -from multiprocessing import pool +from concurrent import futures from taskflow.engines.action_engine import parallel_action from taskflow.engines.action_engine import seq_action @@ -175,7 +176,7 @@ class MultiThreadedActionEngine(ActionEngine): translator_cls = MultiThreadedTranslator def __init__(self, flow, flow_detail=None, book=None, backend=None, - thread_pool=None): + executor=None): if flow_detail is None: flow_detail = p_utils.create_flow_detail(flow, book=book, @@ -183,31 +184,40 @@ class MultiThreadedActionEngine(ActionEngine): ActionEngine.__init__(self, flow, storage=t_storage.ThreadSafeStorage(flow_detail, backend)) - if thread_pool: - self._thread_pool = thread_pool - self._owns_thread_pool = False + if executor is not None: + self._executor = executor + self._owns_executor = False + self._thread_count = -1 else: - self._thread_pool = None - self._owns_thread_pool = True - - @decorators.locked - def compile(self): - ActionEngine.compile(self) - if self._thread_pool is None: - self._thread_pool = pool.ThreadPool() + self._executor = None + self._owns_executor = True + # TODO(harlowja): allow this to be configurable?? + try: + self._thread_count = multiprocessing.cpu_count() + 1 + except NotImplementedError: + # NOTE(harlowja): apparently may raise so in this case we will + # just setup two threads since its hard to know what else we + # should do in this situation. + self._thread_count = 2 @decorators.locked def run(self): + if self._owns_executor: + if self._executor is not None: + # The previous shutdown failed, something is very wrong. + raise exc.InvalidStateException("The previous shutdown() of" + " the executor powering this" + " engine failed. Something is" + " very very wrong.") + self._executor = futures.ThreadPoolExecutor(self._thread_count) try: ActionEngine.run(self) finally: - # Ensure we close then join on the thread pool to make sure its - # resources get cleaned up correctly. - if self._owns_thread_pool and self._thread_pool: - self._thread_pool.close() - self._thread_pool.join() - self._thread_pool = None + # Don't forget to shutdown the executor!! + if self._owns_executor and self._executor is not None: + self._executor.shutdown(wait=True) + self._executor = None @property - def thread_pool(self): - return self._thread_pool + def executor(self): + return self._executor diff --git a/taskflow/engines/action_engine/parallel_action.py b/taskflow/engines/action_engine/parallel_action.py index 345c64d12..5c4762d8e 100644 --- a/taskflow/engines/action_engine/parallel_action.py +++ b/taskflow/engines/action_engine/parallel_action.py @@ -29,7 +29,7 @@ class ParallelAction(base.Action): self._actions.append(action) def _map(self, engine, fn): - pool = engine.thread_pool + executor = engine.executor def call_fn(action): try: @@ -40,7 +40,7 @@ class ParallelAction(base.Action): return None failures = [] - result_iter = pool.imap_unordered(call_fn, self._actions) + result_iter = executor.map(call_fn, self._actions) for result in result_iter: if isinstance(result, misc.Failure): failures.append(result) diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 1314c9edc..4e46777fa 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -17,9 +17,10 @@ # under the License. import contextlib -from multiprocessing import pool import time +from concurrent import futures + from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf @@ -447,20 +448,20 @@ class MultiThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None, thread_pool=None): + def _make_engine(self, flow, flow_detail=None, executor=None): if flow_detail is None: flow_detail = p_utils.create_flow_detail(flow, self.book, self.backend) return eng.MultiThreadedActionEngine(flow, backend=self.backend, flow_detail=flow_detail, - thread_pool=thread_pool) + executor=executor) def test_using_common_pool(self): flow = TestTask(self.values, name='task1') - thread_pool = pool.ThreadPool() - e1 = self._make_engine(flow, thread_pool=thread_pool) - e2 = self._make_engine(flow, thread_pool=thread_pool) - self.assertIs(e1.thread_pool, e2.thread_pool) + executor = futures.ThreadPoolExecutor(2) + e1 = self._make_engine(flow, executor=executor) + e2 = self._make_engine(flow, executor=executor) + self.assertIs(e1.executor, e2.executor) def test_parallel_revert_specific(self): flow = uf.Flow('p-r-r').add( diff --git a/tools/pip-requires b/tools/pip-requires index 116f9987e..14fad9cc1 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -11,3 +11,5 @@ threading2 Babel>=0.9.6 # Used for backend storage engine loading stevedore>=0.10 +# Backport for concurrent.futures which exists in 3.2+ +futures>=2.1.3