From ae9c701f9073941fbe063d2b7854ff6eed5b5fc0 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 14 Jul 2015 16:13:06 -0700 Subject: [PATCH] Add a executor backed conductor and have existing impl. use it This adds a executor backed job dispatching base class and has the existing blocking executor use it by running jobs and dispatching jobs into a sync executor. It also allows for dispatching jobs into a thread executor, or other executor via a new '_executor_factory' method that can generate executors (it can be overriden in the non-blocking executor to provide your own executors instances). This does alter the behavior in that now that jobs are dispatched into an executor we no longer can immediatly know if a job was dispatched and raised an exception or whether it will raise an exception in the future, so we now alter the 'local_dispatched' to just be a boolean that is used to determine if any dispatches happened (failure or not). Change-Id: I485770e8f4c85d3833892a453c9fb5168d8f0407 --- doc/source/conductors.rst | 10 +- setup.cfg | 1 + taskflow/conductors/backends/impl_blocking.py | 258 +------------- taskflow/conductors/backends/impl_executor.py | 333 ++++++++++++++++++ .../conductors/backends/impl_nonblocking.py | 69 ++++ taskflow/conductors/base.py | 31 +- taskflow/jobs/base.py | 7 +- taskflow/tests/unit/conductor/__init__.py | 0 .../test_blocking.py => test_conductors.py} | 58 ++- taskflow/types/timing.py | 4 +- taskflow/utils/misc.py | 14 + taskflow/utils/threading_utils.py | 12 + 12 files changed, 543 insertions(+), 254 deletions(-) create mode 100644 taskflow/conductors/backends/impl_executor.py create mode 100644 taskflow/conductors/backends/impl_nonblocking.py delete mode 100644 taskflow/tests/unit/conductor/__init__.py rename taskflow/tests/unit/{conductor/test_blocking.py => test_conductors.py} (75%) diff --git a/doc/source/conductors.rst b/doc/source/conductors.rst index d6d99a2c9..5d78d53fd 100644 --- a/doc/source/conductors.rst +++ b/doc/source/conductors.rst @@ -9,7 +9,7 @@ Conductors Overview ======== -Conductors in TaskFlow provide a mechanism that unifies the various TaskFlow +Conductors provide a mechanism that unifies the various concepts under a single easy to use (as plug-and-play as we can make it) construct. @@ -66,6 +66,7 @@ Interfaces .. automodule:: taskflow.conductors.base .. automodule:: taskflow.conductors.backends +.. automodule:: taskflow.conductors.backends.impl_executor Implementations =============== @@ -75,12 +76,19 @@ Blocking .. automodule:: taskflow.conductors.backends.impl_blocking +Non-blocking +------------ + +.. automodule:: taskflow.conductors.backends.impl_nonblocking + Hierarchy ========= .. inheritance-diagram:: taskflow.conductors.base taskflow.conductors.backends.impl_blocking + taskflow.conductors.backends.impl_nonblocking + taskflow.conductors.backends.impl_executor :parts: 1 .. _musical conductors: http://en.wikipedia.org/wiki/Conducting diff --git a/setup.cfg b/setup.cfg index ff2415533..f903e7b55 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,6 +37,7 @@ taskflow.jobboards = taskflow.conductors = blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor + nonblocking = taskflow.conductors.backends.impl_nonblocking:NonBlockingConductor taskflow.persistence = dir = taskflow.persistence.backends.impl_dir:DirBackend diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index 3fd5cb922..797338a08 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -12,254 +12,30 @@ # License for the specific language governing permissions and limitations # under the License. -import os -import socket +import futurist -import threading - -try: - from contextlib import ExitStack # noqa -except ImportError: - from contextlib2 import ExitStack # noqa - -from debtcollector import removals -from oslo_utils import excutils -import six -from taskflow.conductors import base -from taskflow import exceptions as excp -from taskflow.listeners import logging as logging_listener -from taskflow import logging -from taskflow.types import entity -from taskflow.types import timing as tt -from taskflow.utils import async_utils -from taskflow.utils import iter_utils - -LOG = logging.getLogger(__name__) -WAIT_TIMEOUT = 0.5 -NO_CONSUME_EXCEPTIONS = tuple([ - excp.ExecutionFailure, - excp.StorageFailure, -]) +from taskflow.conductors.backends import impl_executor -class BlockingConductor(base.Conductor): - """A conductor that runs jobs in its own dispatching loop. +class BlockingConductor(impl_executor.ExecutorConductor): + """Blocking conductor that processes job(s) in a blocking manner.""" - This conductor iterates over jobs in the provided jobboard (waiting for - the given timeout if no jobs exist) and attempts to claim them, work on - those jobs in its local thread (blocking further work from being claimed - and consumed) and then consume those work units after completion. This - process will repeat until the conductor has been stopped or other critical - error occurs. - - NOTE(harlowja): consumption occurs even if a engine fails to run due to - a task failure. This is only skipped when an execution failure or - a storage failure occurs which are *usually* correctable by re-running on - a different conductor (storage failures and execution failures may be - transient issues that can be worked around by later execution). If a job - after completing can not be consumed or abandoned the conductor relies - upon the jobboard capabilities to automatically abandon these jobs. + MAX_SIMULTANEOUS_JOBS = 1 + """ + Default maximum number of jobs that can be in progress at the same time. """ - START_FINISH_EVENTS_EMITTED = tuple([ - 'compilation', 'preparation', - 'validation', 'running', - ]) - """Events will be emitted for the start and finish of each engine - activity defined above, the actual event name that can be registered - to subscribe to will be ``${event}_start`` and ``${event}_end`` where - the ``${event}`` in this pseudo-variable will be one of these events. - """ + @staticmethod + def _executor_factory(): + return futurist.SynchronousExecutor() def __init__(self, name, jobboard, persistence=None, engine=None, - engine_options=None, wait_timeout=None): + engine_options=None, wait_timeout=None, + log=None, max_simultaneous_jobs=MAX_SIMULTANEOUS_JOBS): super(BlockingConductor, self).__init__( - name, jobboard, persistence=persistence, - engine=engine, engine_options=engine_options) - if wait_timeout is None: - wait_timeout = WAIT_TIMEOUT - if isinstance(wait_timeout, (int, float) + six.string_types): - self._wait_timeout = tt.Timeout(float(wait_timeout)) - elif isinstance(wait_timeout, tt.Timeout): - self._wait_timeout = wait_timeout - else: - raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) - self._dead = threading.Event() - - @removals.removed_kwarg('timeout', version="0.8", removal_version="2.0") - def stop(self, timeout=None): - """Requests the conductor to stop dispatching. - - This method can be used to request that a conductor stop its - consumption & dispatching loop. - - The method returns immediately regardless of whether the conductor has - been stopped. - - .. deprecated:: 0.8 - - The ``timeout`` parameter is **deprecated** and is present for - backward compatibility **only**. In order to wait for the - conductor to gracefully shut down, :py:meth:`wait` should be used - instead. - """ - self._wait_timeout.interrupt() - - @property - def dispatching(self): - return not self._dead.is_set() - - def _listeners_from_job(self, job, engine): - listeners = super(BlockingConductor, self)._listeners_from_job(job, - engine) - listeners.append(logging_listener.LoggingListener(engine, log=LOG)) - return listeners - - def _dispatch_job(self, job): - engine = self._engine_from_job(job) - listeners = self._listeners_from_job(job, engine) - with ExitStack() as stack: - for listener in listeners: - stack.enter_context(listener) - LOG.debug("Dispatching engine for job '%s'", job) - consume = True - try: - for stage_func, event_name in [(engine.compile, 'compilation'), - (engine.prepare, 'preparation'), - (engine.validate, 'validation'), - (engine.run, 'running')]: - self._notifier.notify("%s_start" % event_name, { - 'job': job, - 'engine': engine, - 'conductor': self, - }) - stage_func() - self._notifier.notify("%s_end" % event_name, { - 'job': job, - 'engine': engine, - 'conductor': self, - }) - except excp.WrappedFailure as e: - if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): - consume = False - if LOG.isEnabledFor(logging.WARNING): - if consume: - LOG.warn("Job execution failed (consumption being" - " skipped): %s [%s failures]", job, len(e)) - else: - LOG.warn("Job execution failed (consumption" - " proceeding): %s [%s failures]", job, len(e)) - # Show the failure/s + traceback (if possible)... - for i, f in enumerate(e): - LOG.warn("%s. %s", i + 1, f.pformat(traceback=True)) - except NO_CONSUME_EXCEPTIONS: - LOG.warn("Job execution failed (consumption being" - " skipped): %s", job, exc_info=True) - consume = False - except Exception: - LOG.warn("Job execution failed (consumption proceeding): %s", - job, exc_info=True) - else: - LOG.info("Job completed successfully: %s", job) - return async_utils.make_completed_future(consume) - - def _get_conductor_info(self): - """For right now we just register the conductor name as: - - @: - - """ - hostname = socket.gethostname() - pid = os.getpid() - name = '@'.join([ - self._name, hostname+":"+str(pid)]) - # Can add a lot more information here, - metadata = { - "hostname": hostname, - "pid": pid - } - - return entity.Entity("conductor", name, metadata) - - def run(self, max_dispatches=None): - self._dead.clear() - - # Register a conductor type entity - self._jobboard.register_entity(self._get_conductor_info()) - - total_dispatched = 0 - try: - - if max_dispatches is None: - # NOTE(TheSriram): if max_dispatches is not set, - # then the conductor will run indefinitely, and not - # stop after 'n' number of dispatches - max_dispatches = -1 - - dispatch_gen = iter_utils.iter_forever(max_dispatches) - - while True: - if self._wait_timeout.is_stopped(): - break - local_dispatched = 0 - for job in self._jobboard.iterjobs(): - if self._wait_timeout.is_stopped(): - break - LOG.debug("Trying to claim job: %s", job) - try: - self._jobboard.claim(job, self._name) - except (excp.UnclaimableJob, excp.NotFound): - LOG.debug("Job already claimed or consumed: %s", job) - continue - consume = False - try: - f = self._dispatch_job(job) - except KeyboardInterrupt: - with excutils.save_and_reraise_exception(): - LOG.warn("Job dispatching interrupted: %s", job) - except Exception: - LOG.warn("Job dispatching failed: %s", job, - exc_info=True) - else: - - local_dispatched += 1 - consume = f.result() - try: - if consume: - self._jobboard.consume(job, self._name) - else: - self._jobboard.abandon(job, self._name) - except (excp.JobFailure, excp.NotFound): - if consume: - LOG.warn("Failed job consumption: %s", job, - exc_info=True) - else: - LOG.warn("Failed job abandonment: %s", job, - exc_info=True) - - total_dispatched = next(dispatch_gen) - - if local_dispatched == 0 and \ - not self._wait_timeout.is_stopped(): - self._wait_timeout.wait() - - except StopIteration: - if max_dispatches >= 0 and total_dispatched >= max_dispatches: - LOG.info("Maximum dispatch limit of %s reached", - max_dispatches) - finally: - self._dead.set() - - def wait(self, timeout=None): - """Waits for the conductor to gracefully exit. - - This method waits for the conductor to gracefully exit. An optional - timeout can be provided, which will cause the method to return - within the specified timeout. If the timeout is reached, the returned - value will be False. - - :param timeout: Maximum number of seconds that the :meth:`wait` method - should block for. - """ - return self._dead.wait(timeout) + name, jobboard, + persistence=persistence, engine=engine, + engine_options=engine_options, + wait_timeout=wait_timeout, log=log, + max_simultaneous_jobs=max_simultaneous_jobs) diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py new file mode 100644 index 000000000..c47488da0 --- /dev/null +++ b/taskflow/conductors/backends/impl_executor.py @@ -0,0 +1,333 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import functools +import itertools +import threading + +try: + from contextlib import ExitStack # noqa +except ImportError: + from contextlib2 import ExitStack # noqa + +from debtcollector import removals +from oslo_utils import excutils +import six + +from taskflow.conductors import base +from taskflow import exceptions as excp +from taskflow.listeners import logging as logging_listener +from taskflow import logging +from taskflow.types import timing as tt +from taskflow.utils import iter_utils +from taskflow.utils import misc + +LOG = logging.getLogger(__name__) + + +def _convert_to_timeout(value=None, default_value=None, event_factory=None): + if value is None: + value = default_value + if isinstance(value, (int, float) + six.string_types): + return tt.Timeout(float(value), event_factory=event_factory) + elif isinstance(value, tt.Timeout): + return value + else: + raise ValueError("Invalid timeout literal '%s'" % (value)) + + +@six.add_metaclass(abc.ABCMeta) +class ExecutorConductor(base.Conductor): + """Dispatches jobs from blocking :py:meth:`.run` method to some executor. + + This conductor iterates over jobs in the provided jobboard (waiting for + the given timeout if no jobs exist) and attempts to claim them, work on + those jobs using an executor (potentially blocking further work from being + claimed and consumed) and then consume those work units after + completion. This process will repeat until the conductor has been stopped + or other critical error occurs. + + NOTE(harlowja): consumption occurs even if a engine fails to run due to + a atom failure. This is only skipped when an execution failure or + a storage failure occurs which are *usually* correctable by re-running on + a different conductor (storage failures and execution failures may be + transient issues that can be worked around by later execution). If a job + after completing can not be consumed or abandoned the conductor relies + upon the jobboard capabilities to automatically abandon these jobs. + """ + + LOG = None + """ + Logger that will be used for listening to events (if none then the module + level logger will be used instead). + """ + + #: Default timeout used to idle/wait when no jobs have been found. + WAIT_TIMEOUT = 0.5 + + MAX_SIMULTANEOUS_JOBS = -1 + """ + Default maximum number of jobs that can be in progress at the same time. + + Negative or zero values imply no limit (do note that if a executor is + used that is built on a queue, as most are, that this will imply that the + queue will contain a potentially large & unfinished backlog of + submitted jobs). This *may* get better someday if + https://bugs.python.org/issue22737 is ever implemented and released. + """ + + #: Exceptions that will **not** cause consumption to occur. + NO_CONSUME_EXCEPTIONS = tuple([ + excp.ExecutionFailure, + excp.StorageFailure, + ]) + + _event_factory = threading.Event + """This attribute *can* be overridden by subclasses (for example if + an eventlet *green* event works better for the conductor user).""" + + START_FINISH_EVENTS_EMITTED = tuple([ + 'compilation', 'preparation', + 'validation', 'running', + ]) + """Events will be emitted for the start and finish of each engine + activity defined above, the actual event name that can be registered + to subscribe to will be ``${event}_start`` and ``${event}_end`` where + the ``${event}`` in this pseudo-variable will be one of these events. + """ + + def __init__(self, name, jobboard, + persistence=None, engine=None, + engine_options=None, wait_timeout=None, + log=None, max_simultaneous_jobs=MAX_SIMULTANEOUS_JOBS): + super(ExecutorConductor, self).__init__( + name, jobboard, persistence=persistence, + engine=engine, engine_options=engine_options) + self._wait_timeout = _convert_to_timeout( + value=wait_timeout, default_value=self.WAIT_TIMEOUT, + event_factory=self._event_factory) + self._dead = self._event_factory() + self._log = misc.pick_first_not_none(log, self.LOG, LOG) + self._max_simultaneous_jobs = int( + misc.pick_first_not_none(max_simultaneous_jobs, + self.MAX_SIMULTANEOUS_JOBS)) + self._dispatched = set() + + def _executor_factory(self): + """Creates an executor to be used during dispatching.""" + raise excp.NotImplementedError("This method must be implemented but" + " it has not been") + + @removals.removed_kwarg('timeout', version="0.8", removal_version="2.0") + def stop(self, timeout=None): + """Requests the conductor to stop dispatching. + + This method can be used to request that a conductor stop its + consumption & dispatching loop. + + The method returns immediately regardless of whether the conductor has + been stopped. + + .. deprecated:: 0.8 + + The ``timeout`` parameter is **deprecated** and is present for + backward compatibility **only**. In order to wait for the + conductor to gracefully shut down, :py:meth:`wait` should be used + instead. + """ + self._wait_timeout.interrupt() + + @property + def dispatching(self): + """Whether or not the dispatching loop is still dispatching.""" + return not self._dead.is_set() + + def _listeners_from_job(self, job, engine): + listeners = super(ExecutorConductor, self)._listeners_from_job( + job, engine) + listeners.append(logging_listener.LoggingListener(engine, + log=self._log)) + return listeners + + def _dispatch_job(self, job): + engine = self._engine_from_job(job) + listeners = self._listeners_from_job(job, engine) + with ExitStack() as stack: + for listener in listeners: + stack.enter_context(listener) + self._log.debug("Dispatching engine for job '%s'", job) + consume = True + try: + for stage_func, event_name in [(engine.compile, 'compilation'), + (engine.prepare, 'preparation'), + (engine.validate, 'validation'), + (engine.run, 'running')]: + self._notifier.notify("%s_start" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) + stage_func() + self._notifier.notify("%s_end" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) + except excp.WrappedFailure as e: + if all((f.check(*self.NO_CONSUME_EXCEPTIONS) for f in e)): + consume = False + if self._log.isEnabledFor(logging.WARNING): + if consume: + self._log.warn( + "Job execution failed (consumption being" + " skipped): %s [%s failures]", job, len(e)) + else: + self._log.warn( + "Job execution failed (consumption" + " proceeding): %s [%s failures]", job, len(e)) + # Show the failure/s + traceback (if possible)... + for i, f in enumerate(e): + self._log.warn("%s. %s", i + 1, + f.pformat(traceback=True)) + except self.NO_CONSUME_EXCEPTIONS: + self._log.warn("Job execution failed (consumption being" + " skipped): %s", job, exc_info=True) + consume = False + except Exception: + self._log.warn( + "Job execution failed (consumption proceeding): %s", + job, exc_info=True) + else: + self._log.info("Job completed successfully: %s", job) + return consume + + def _try_finish_job(self, job, consume): + try: + if consume: + self._jobboard.consume(job, self._name) + else: + self._jobboard.abandon(job, self._name) + except (excp.JobFailure, excp.NotFound): + if consume: + self._log.warn("Failed job consumption: %s", job, + exc_info=True) + else: + self._log.warn("Failed job abandonment: %s", job, + exc_info=True) + + def _on_job_done(self, job, fut): + consume = False + try: + consume = fut.result() + except KeyboardInterrupt: + with excutils.save_and_reraise_exception(): + self._log.warn("Job dispatching interrupted: %s", job) + except Exception: + self._log.warn("Job dispatching failed: %s", job, exc_info=True) + try: + self._try_finish_job(job, consume) + finally: + self._dispatched.discard(fut) + + def _can_claim_more_jobs(self, job): + if self._wait_timeout.is_stopped(): + return False + if self._max_simultaneous_jobs <= 0: + return True + if len(self._dispatched) >= self._max_simultaneous_jobs: + return False + else: + return True + + def _run_until_dead(self, executor, max_dispatches=None): + total_dispatched = 0 + if max_dispatches is None: + # NOTE(TheSriram): if max_dispatches is not set, + # then the conductor will run indefinitely, and not + # stop after 'n' number of dispatches + max_dispatches = -1 + dispatch_gen = iter_utils.iter_forever(max_dispatches) + is_stopped = self._wait_timeout.is_stopped + try: + # Don't even do any work in the first place... + if max_dispatches == 0: + raise StopIteration + while not is_stopped(): + any_dispatched = False + for job in itertools.takewhile(self._can_claim_more_jobs, + self._jobboard.iterjobs()): + self._log.debug("Trying to claim job: %s", job) + try: + self._jobboard.claim(job, self._name) + except (excp.UnclaimableJob, excp.NotFound): + self._log.debug("Job already claimed or" + " consumed: %s", job) + else: + try: + fut = executor.submit(self._dispatch_job, job) + except RuntimeError: + with excutils.save_and_reraise_exception(): + self._log.warn("Job dispatch submitting" + " failed: %s", job) + self._try_finish_job(job, False) + else: + fut.job = job + self._dispatched.add(fut) + any_dispatched = True + fut.add_done_callback( + functools.partial(self._on_job_done, job)) + total_dispatched = next(dispatch_gen) + if not any_dispatched and not is_stopped(): + self._wait_timeout.wait() + except StopIteration: + # This will be raised from 'dispatch_gen' if it reaches its + # max dispatch number (which implies we should do no more work). + with excutils.save_and_reraise_exception(): + if max_dispatches >= 0 and total_dispatched >= max_dispatches: + self._log.info("Maximum dispatch limit of %s reached", + max_dispatches) + + def run(self, max_dispatches=None): + self._dead.clear() + self._dispatched.clear() + try: + self._jobboard.register_entity(self.conductor) + with self._executor_factory() as executor: + self._run_until_dead(executor, + max_dispatches=max_dispatches) + except StopIteration: + pass + except KeyboardInterrupt: + with excutils.save_and_reraise_exception(): + self._log.warn("Job dispatching interrupted") + finally: + self._dead.set() + + # Inherit the docs, so we can reference them in our class docstring, + # if we don't do this sphinx gets confused... + run.__doc__ = base.Conductor.run.__doc__ + + def wait(self, timeout=None): + """Waits for the conductor to gracefully exit. + + This method waits for the conductor to gracefully exit. An optional + timeout can be provided, which will cause the method to return + within the specified timeout. If the timeout is reached, the returned + value will be ``False``, otherwise it will be ``True``. + + :param timeout: Maximum number of seconds that the :meth:`wait` method + should block for. + """ + return self._dead.wait(timeout) diff --git a/taskflow/conductors/backends/impl_nonblocking.py b/taskflow/conductors/backends/impl_nonblocking.py new file mode 100644 index 000000000..76893d703 --- /dev/null +++ b/taskflow/conductors/backends/impl_nonblocking.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import futurist +import six + +from taskflow.conductors.backends import impl_executor +from taskflow.utils import threading_utils as tu + + +class NonBlockingConductor(impl_executor.ExecutorConductor): + """Non-blocking conductor that processes job(s) using a thread executor. + + NOTE(harlowja): A custom executor factory can be provided via keyword + argument ``executor_factory``, if provided it will be + invoked at + :py:meth:`~taskflow.conductors.base.Conductor.run` time + with one positional argument (this conductor) and it must + return a compatible `executor`_ which can be used + to submit jobs to. If ``None`` is a provided a thread pool + backed executor is selected by default (it will have + an equivalent number of workers as this conductors + simultaneous job count). + + .. _executor: https://docs.python.org/dev/library/\ + concurrent.futures.html#executor-objects + """ + + MAX_SIMULTANEOUS_JOBS = tu.get_optimal_thread_count() + """ + Default maximum number of jobs that can be in progress at the same time. + """ + + def _default_executor_factory(self): + max_simultaneous_jobs = self._max_simultaneous_jobs + if max_simultaneous_jobs <= 0: + max_workers = tu.get_optimal_thread_count() + else: + max_workers = max_simultaneous_jobs + return futurist.ThreadPoolExecutor(max_workers=max_workers) + + def __init__(self, name, jobboard, + persistence=None, engine=None, + engine_options=None, wait_timeout=None, + log=None, max_simultaneous_jobs=MAX_SIMULTANEOUS_JOBS, + executor_factory=None): + super(NonBlockingConductor, self).__init__( + name, jobboard, + persistence=persistence, engine=engine, + engine_options=engine_options, wait_timeout=wait_timeout, + log=log, max_simultaneous_jobs=max_simultaneous_jobs) + if executor_factory is None: + self._executor_factory = self._default_executor_factory + else: + if not six.callable(executor_factory): + raise ValueError("Provided keyword argument 'executor_factory'" + " must be callable") + self._executor_factory = executor_factory diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 694242324..750d8cffe 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -13,6 +13,7 @@ # under the License. import abc +import os import threading import fasteners @@ -20,7 +21,9 @@ import six from taskflow import engines from taskflow import exceptions as excp +from taskflow.types import entity from taskflow.types import notifier +from taskflow.utils import misc @six.add_metaclass(abc.ABCMeta) @@ -35,6 +38,9 @@ class Conductor(object): period of time will finish up the prior failed conductors work. """ + #: Entity kind used when creating new entity objects + ENTITY_KIND = 'conductor' + def __init__(self, name, jobboard, persistence=None, engine=None, engine_options=None): self._name = name @@ -48,6 +54,18 @@ class Conductor(object): self._lock = threading.RLock() self._notifier = notifier.Notifier() + @misc.cachedproperty + def conductor(self): + """Entity object that represents this conductor.""" + hostname = misc.get_hostname() + pid = os.getpid() + name = '@'.join([self._name, hostname + ":" + str(pid)]) + metadata = { + 'hostname': hostname, + 'pid': pid, + } + return entity.Entity(self.ENTITY_KIND, name, metadata) + @property def notifier(self): """The conductor actions (or other state changes) notifier. @@ -134,8 +152,17 @@ class Conductor(object): self._jobboard.close() @abc.abstractmethod - def run(self): - """Continuously claims, runs, and consumes jobs (and repeat).""" + def run(self, max_dispatches=None): + """Continuously claims, runs, and consumes jobs (and repeat). + + :param max_dispatches: An upper bound on the number of jobs that will + be dispatched, if none or negative this implies + there is no limit to the number of jobs that + will be dispatched, otherwise if positive this + run method will return when that amount of jobs + has been dispatched (instead of running + forever and/or until stopped). + """ @abc.abstractmethod def _dispatch_job(self, job): diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index 9e95ee1cc..8e5d77c35 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -388,7 +388,12 @@ class JobBoard(object): @abc.abstractmethod def register_entity(self, entity): - """Register an entity to the jobboard('s backend), e.g: a conductor""" + """Register an entity to the jobboard('s backend), e.g: a conductor. + + :param entity: entity to register as being associated with the + jobboard('s backend) + :type entity: :py:class:`~taskflow.types.entity.Entity` + """ @abc.abstractproperty def connected(self): diff --git a/taskflow/tests/unit/conductor/__init__.py b/taskflow/tests/unit/conductor/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/taskflow/tests/unit/conductor/test_blocking.py b/taskflow/tests/unit/test_conductors.py similarity index 75% rename from taskflow/tests/unit/conductor/test_blocking.py rename to taskflow/tests/unit/test_conductors.py index 29d211fc9..d7f84d50b 100644 --- a/taskflow/tests/unit/conductor/test_blocking.py +++ b/taskflow/tests/unit/test_conductors.py @@ -18,6 +18,8 @@ import collections import contextlib import threading +import futurist +import testscenarios from zake import fake_client from taskflow.conductors import backends @@ -51,23 +53,39 @@ def test_factory(blowup): return f +def single_factory(): + return futurist.ThreadPoolExecutor(max_workers=1) + + ComponentBundle = collections.namedtuple('ComponentBundle', ['board', 'client', 'persistence', 'conductor']) -class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase): - KIND = 'blocking' +class ManyConductorTest(testscenarios.TestWithScenarios, + test_utils.EngineTestBase, test.TestCase): + scenarios = [ + ('blocking', {'kind': 'blocking', + 'conductor_kwargs': {'wait_timeout': 0.1}}), + ('nonblocking_many_thread', + {'kind': 'nonblocking', 'conductor_kwargs': {'wait_timeout': 0.1}}), + ('nonblocking_one_thread', {'kind': 'nonblocking', + 'conductor_kwargs': { + 'executor_factory': single_factory, + 'wait_timeout': 0.1, + }}) + ] - def make_components(self, name='testing', wait_timeout=0.1): + def make_components(self): client = fake_client.FakeClient() persistence = impl_memory.MemoryBackend() - board = impl_zookeeper.ZookeeperJobBoard(name, {}, + board = impl_zookeeper.ZookeeperJobBoard('testing', {}, client=client, persistence=persistence) - conductor = backends.fetch(self.KIND, name, board, - persistence=persistence, - wait_timeout=wait_timeout) + conductor_kwargs = self.conductor_kwargs.copy() + conductor_kwargs['persistence'] = persistence + conductor = backends.fetch(self.kind, 'testing', board, + **conductor_kwargs) return ComponentBundle(board, client, persistence, conductor) def test_connection(self): @@ -178,3 +196,29 @@ class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase): fd = lb.find(fd.uuid) self.assertIsNotNone(fd) self.assertEqual(st.REVERTED, fd.state) + + +class NonBlockingExecutorTest(test.TestCase): + def test_bad_wait_timeout(self): + persistence = impl_memory.MemoryBackend() + client = fake_client.FakeClient() + board = impl_zookeeper.ZookeeperJobBoard('testing', {}, + client=client, + persistence=persistence) + self.assertRaises(ValueError, + backends.fetch, + 'nonblocking', 'testing', board, + persistence=persistence, + wait_timeout='testing') + + def test_bad_factory(self): + persistence = impl_memory.MemoryBackend() + client = fake_client.FakeClient() + board = impl_zookeeper.ZookeeperJobBoard('testing', {}, + client=client, + persistence=persistence) + self.assertRaises(ValueError, + backends.fetch, + 'nonblocking', 'testing', board, + persistence=persistence, + executor_factory='testing') diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 2fa7d20a6..99aeac564 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -31,11 +31,11 @@ class Timeout(object): This object has the ability to be interrupted before the actual timeout is reached. """ - def __init__(self, timeout): + def __init__(self, timeout, event_factory=threading.Event): if timeout < 0: raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) self._timeout = timeout - self._event = threading.Event() + self._event = event_factory() def interrupt(self): self._event.set() diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index aa89aa817..ca8faa5e4 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -22,6 +22,7 @@ import errno import inspect import os import re +import socket import sys import threading import types @@ -42,6 +43,7 @@ from taskflow.types import notifier from taskflow.utils import deprecation +UNKNOWN_HOSTNAME = "" NUMERIC_TYPES = six.integer_types + (float,) # NOTE(imelnikov): regular expression to get scheme from URI, @@ -68,6 +70,18 @@ class StringIO(six.StringIO): self.write(linesep) +def get_hostname(unknown_hostname=UNKNOWN_HOSTNAME): + """Gets the machines hostname; if not able to returns an invalid one.""" + try: + hostname = socket.getfqdn() + if not hostname: + return unknown_hostname + else: + return hostname + except socket.error: + return unknown_hostname + + def match_type(obj, matchers): """Matches a given object using the given matchers list/iterable. diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index 7de0151d8..ed5546839 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -15,6 +15,7 @@ # under the License. import collections +import multiprocessing import threading import six @@ -35,6 +36,17 @@ def get_ident(): return _thread.get_ident() +def get_optimal_thread_count(default=2): + """Try to guess optimal thread count for current system.""" + try: + return multiprocessing.cpu_count() + 1 + except NotImplementedError: + # NOTE(harlowja): apparently may raise so in this case we will + # just setup two threads since it's hard to know what else we + # should do in this situation. + return default + + def daemon_thread(target, *args, **kwargs): """Makes a daemon thread that calls the given target when started.""" thread = threading.Thread(target=target, args=args, kwargs=kwargs)