Add a executor backed conductor and have existing impl. use it
This adds a executor backed job dispatching base class and has the existing blocking executor use it by running jobs and dispatching jobs into a sync executor. It also allows for dispatching jobs into a thread executor, or other executor via a new '_executor_factory' method that can generate executors (it can be overriden in the non-blocking executor to provide your own executors instances). This does alter the behavior in that now that jobs are dispatched into an executor we no longer can immediatly know if a job was dispatched and raised an exception or whether it will raise an exception in the future, so we now alter the 'local_dispatched' to just be a boolean that is used to determine if any dispatches happened (failure or not). Change-Id: I485770e8f4c85d3833892a453c9fb5168d8f0407
This commit is contained in:
parent
05fbf1faac
commit
ae9c701f90
@ -9,7 +9,7 @@ Conductors
|
||||
Overview
|
||||
========
|
||||
|
||||
Conductors in TaskFlow provide a mechanism that unifies the various TaskFlow
|
||||
Conductors provide a mechanism that unifies the various
|
||||
concepts under a single easy to use (as plug-and-play as we can make it)
|
||||
construct.
|
||||
|
||||
@ -66,6 +66,7 @@ Interfaces
|
||||
|
||||
.. automodule:: taskflow.conductors.base
|
||||
.. automodule:: taskflow.conductors.backends
|
||||
.. automodule:: taskflow.conductors.backends.impl_executor
|
||||
|
||||
Implementations
|
||||
===============
|
||||
@ -75,12 +76,19 @@ Blocking
|
||||
|
||||
.. automodule:: taskflow.conductors.backends.impl_blocking
|
||||
|
||||
Non-blocking
|
||||
------------
|
||||
|
||||
.. automodule:: taskflow.conductors.backends.impl_nonblocking
|
||||
|
||||
Hierarchy
|
||||
=========
|
||||
|
||||
.. inheritance-diagram::
|
||||
taskflow.conductors.base
|
||||
taskflow.conductors.backends.impl_blocking
|
||||
taskflow.conductors.backends.impl_nonblocking
|
||||
taskflow.conductors.backends.impl_executor
|
||||
:parts: 1
|
||||
|
||||
.. _musical conductors: http://en.wikipedia.org/wiki/Conducting
|
||||
|
@ -37,6 +37,7 @@ taskflow.jobboards =
|
||||
|
||||
taskflow.conductors =
|
||||
blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor
|
||||
nonblocking = taskflow.conductors.backends.impl_nonblocking:NonBlockingConductor
|
||||
|
||||
taskflow.persistence =
|
||||
dir = taskflow.persistence.backends.impl_dir:DirBackend
|
||||
|
@ -12,254 +12,30 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import socket
|
||||
import futurist
|
||||
|
||||
import threading
|
||||
|
||||
try:
|
||||
from contextlib import ExitStack # noqa
|
||||
except ImportError:
|
||||
from contextlib2 import ExitStack # noqa
|
||||
|
||||
from debtcollector import removals
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
from taskflow.conductors import base
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.listeners import logging as logging_listener
|
||||
from taskflow import logging
|
||||
from taskflow.types import entity
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import iter_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAIT_TIMEOUT = 0.5
|
||||
NO_CONSUME_EXCEPTIONS = tuple([
|
||||
excp.ExecutionFailure,
|
||||
excp.StorageFailure,
|
||||
])
|
||||
from taskflow.conductors.backends import impl_executor
|
||||
|
||||
|
||||
class BlockingConductor(base.Conductor):
|
||||
"""A conductor that runs jobs in its own dispatching loop.
|
||||
class BlockingConductor(impl_executor.ExecutorConductor):
|
||||
"""Blocking conductor that processes job(s) in a blocking manner."""
|
||||
|
||||
This conductor iterates over jobs in the provided jobboard (waiting for
|
||||
the given timeout if no jobs exist) and attempts to claim them, work on
|
||||
those jobs in its local thread (blocking further work from being claimed
|
||||
and consumed) and then consume those work units after completion. This
|
||||
process will repeat until the conductor has been stopped or other critical
|
||||
error occurs.
|
||||
|
||||
NOTE(harlowja): consumption occurs even if a engine fails to run due to
|
||||
a task failure. This is only skipped when an execution failure or
|
||||
a storage failure occurs which are *usually* correctable by re-running on
|
||||
a different conductor (storage failures and execution failures may be
|
||||
transient issues that can be worked around by later execution). If a job
|
||||
after completing can not be consumed or abandoned the conductor relies
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
MAX_SIMULTANEOUS_JOBS = 1
|
||||
"""
|
||||
Default maximum number of jobs that can be in progress at the same time.
|
||||
"""
|
||||
|
||||
START_FINISH_EVENTS_EMITTED = tuple([
|
||||
'compilation', 'preparation',
|
||||
'validation', 'running',
|
||||
])
|
||||
"""Events will be emitted for the start and finish of each engine
|
||||
activity defined above, the actual event name that can be registered
|
||||
to subscribe to will be ``${event}_start`` and ``${event}_end`` where
|
||||
the ``${event}`` in this pseudo-variable will be one of these events.
|
||||
"""
|
||||
@staticmethod
|
||||
def _executor_factory():
|
||||
return futurist.SynchronousExecutor()
|
||||
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None,
|
||||
engine_options=None, wait_timeout=None):
|
||||
engine_options=None, wait_timeout=None,
|
||||
log=None, max_simultaneous_jobs=MAX_SIMULTANEOUS_JOBS):
|
||||
super(BlockingConductor, self).__init__(
|
||||
name, jobboard, persistence=persistence,
|
||||
engine=engine, engine_options=engine_options)
|
||||
if wait_timeout is None:
|
||||
wait_timeout = WAIT_TIMEOUT
|
||||
if isinstance(wait_timeout, (int, float) + six.string_types):
|
||||
self._wait_timeout = tt.Timeout(float(wait_timeout))
|
||||
elif isinstance(wait_timeout, tt.Timeout):
|
||||
self._wait_timeout = wait_timeout
|
||||
else:
|
||||
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
|
||||
self._dead = threading.Event()
|
||||
|
||||
@removals.removed_kwarg('timeout', version="0.8", removal_version="2.0")
|
||||
def stop(self, timeout=None):
|
||||
"""Requests the conductor to stop dispatching.
|
||||
|
||||
This method can be used to request that a conductor stop its
|
||||
consumption & dispatching loop.
|
||||
|
||||
The method returns immediately regardless of whether the conductor has
|
||||
been stopped.
|
||||
|
||||
.. deprecated:: 0.8
|
||||
|
||||
The ``timeout`` parameter is **deprecated** and is present for
|
||||
backward compatibility **only**. In order to wait for the
|
||||
conductor to gracefully shut down, :py:meth:`wait` should be used
|
||||
instead.
|
||||
"""
|
||||
self._wait_timeout.interrupt()
|
||||
|
||||
@property
|
||||
def dispatching(self):
|
||||
return not self._dead.is_set()
|
||||
|
||||
def _listeners_from_job(self, job, engine):
|
||||
listeners = super(BlockingConductor, self)._listeners_from_job(job,
|
||||
engine)
|
||||
listeners.append(logging_listener.LoggingListener(engine, log=LOG))
|
||||
return listeners
|
||||
|
||||
def _dispatch_job(self, job):
|
||||
engine = self._engine_from_job(job)
|
||||
listeners = self._listeners_from_job(job, engine)
|
||||
with ExitStack() as stack:
|
||||
for listener in listeners:
|
||||
stack.enter_context(listener)
|
||||
LOG.debug("Dispatching engine for job '%s'", job)
|
||||
consume = True
|
||||
try:
|
||||
for stage_func, event_name in [(engine.compile, 'compilation'),
|
||||
(engine.prepare, 'preparation'),
|
||||
(engine.validate, 'validation'),
|
||||
(engine.run, 'running')]:
|
||||
self._notifier.notify("%s_start" % event_name, {
|
||||
'job': job,
|
||||
'engine': engine,
|
||||
'conductor': self,
|
||||
})
|
||||
stage_func()
|
||||
self._notifier.notify("%s_end" % event_name, {
|
||||
'job': job,
|
||||
'engine': engine,
|
||||
'conductor': self,
|
||||
})
|
||||
except excp.WrappedFailure as e:
|
||||
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
|
||||
consume = False
|
||||
if LOG.isEnabledFor(logging.WARNING):
|
||||
if consume:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s [%s failures]", job, len(e))
|
||||
else:
|
||||
LOG.warn("Job execution failed (consumption"
|
||||
" proceeding): %s [%s failures]", job, len(e))
|
||||
# Show the failure/s + traceback (if possible)...
|
||||
for i, f in enumerate(e):
|
||||
LOG.warn("%s. %s", i + 1, f.pformat(traceback=True))
|
||||
except NO_CONSUME_EXCEPTIONS:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s", job, exc_info=True)
|
||||
consume = False
|
||||
except Exception:
|
||||
LOG.warn("Job execution failed (consumption proceeding): %s",
|
||||
job, exc_info=True)
|
||||
else:
|
||||
LOG.info("Job completed successfully: %s", job)
|
||||
return async_utils.make_completed_future(consume)
|
||||
|
||||
def _get_conductor_info(self):
|
||||
"""For right now we just register the conductor name as:
|
||||
|
||||
<conductor_name>@<hostname>:<process_pid>
|
||||
|
||||
"""
|
||||
hostname = socket.gethostname()
|
||||
pid = os.getpid()
|
||||
name = '@'.join([
|
||||
self._name, hostname+":"+str(pid)])
|
||||
# Can add a lot more information here,
|
||||
metadata = {
|
||||
"hostname": hostname,
|
||||
"pid": pid
|
||||
}
|
||||
|
||||
return entity.Entity("conductor", name, metadata)
|
||||
|
||||
def run(self, max_dispatches=None):
|
||||
self._dead.clear()
|
||||
|
||||
# Register a conductor type entity
|
||||
self._jobboard.register_entity(self._get_conductor_info())
|
||||
|
||||
total_dispatched = 0
|
||||
try:
|
||||
|
||||
if max_dispatches is None:
|
||||
# NOTE(TheSriram): if max_dispatches is not set,
|
||||
# then the conductor will run indefinitely, and not
|
||||
# stop after 'n' number of dispatches
|
||||
max_dispatches = -1
|
||||
|
||||
dispatch_gen = iter_utils.iter_forever(max_dispatches)
|
||||
|
||||
while True:
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
local_dispatched = 0
|
||||
for job in self._jobboard.iterjobs():
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
LOG.debug("Trying to claim job: %s", job)
|
||||
try:
|
||||
self._jobboard.claim(job, self._name)
|
||||
except (excp.UnclaimableJob, excp.NotFound):
|
||||
LOG.debug("Job already claimed or consumed: %s", job)
|
||||
continue
|
||||
consume = False
|
||||
try:
|
||||
f = self._dispatch_job(job)
|
||||
except KeyboardInterrupt:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.warn("Job dispatching interrupted: %s", job)
|
||||
except Exception:
|
||||
LOG.warn("Job dispatching failed: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
|
||||
local_dispatched += 1
|
||||
consume = f.result()
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except (excp.JobFailure, excp.NotFound):
|
||||
if consume:
|
||||
LOG.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
LOG.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
|
||||
total_dispatched = next(dispatch_gen)
|
||||
|
||||
if local_dispatched == 0 and \
|
||||
not self._wait_timeout.is_stopped():
|
||||
self._wait_timeout.wait()
|
||||
|
||||
except StopIteration:
|
||||
if max_dispatches >= 0 and total_dispatched >= max_dispatches:
|
||||
LOG.info("Maximum dispatch limit of %s reached",
|
||||
max_dispatches)
|
||||
finally:
|
||||
self._dead.set()
|
||||
|
||||
def wait(self, timeout=None):
|
||||
"""Waits for the conductor to gracefully exit.
|
||||
|
||||
This method waits for the conductor to gracefully exit. An optional
|
||||
timeout can be provided, which will cause the method to return
|
||||
within the specified timeout. If the timeout is reached, the returned
|
||||
value will be False.
|
||||
|
||||
:param timeout: Maximum number of seconds that the :meth:`wait` method
|
||||
should block for.
|
||||
"""
|
||||
return self._dead.wait(timeout)
|
||||
name, jobboard,
|
||||
persistence=persistence, engine=engine,
|
||||
engine_options=engine_options,
|
||||
wait_timeout=wait_timeout, log=log,
|
||||
max_simultaneous_jobs=max_simultaneous_jobs)
|
||||
|
333
taskflow/conductors/backends/impl_executor.py
Normal file
333
taskflow/conductors/backends/impl_executor.py
Normal file
@ -0,0 +1,333 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import functools
|
||||
import itertools
|
||||
import threading
|
||||
|
||||
try:
|
||||
from contextlib import ExitStack # noqa
|
||||
except ImportError:
|
||||
from contextlib2 import ExitStack # noqa
|
||||
|
||||
from debtcollector import removals
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
from taskflow.conductors import base
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.listeners import logging as logging_listener
|
||||
from taskflow import logging
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import iter_utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _convert_to_timeout(value=None, default_value=None, event_factory=None):
|
||||
if value is None:
|
||||
value = default_value
|
||||
if isinstance(value, (int, float) + six.string_types):
|
||||
return tt.Timeout(float(value), event_factory=event_factory)
|
||||
elif isinstance(value, tt.Timeout):
|
||||
return value
|
||||
else:
|
||||
raise ValueError("Invalid timeout literal '%s'" % (value))
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExecutorConductor(base.Conductor):
|
||||
"""Dispatches jobs from blocking :py:meth:`.run` method to some executor.
|
||||
|
||||
This conductor iterates over jobs in the provided jobboard (waiting for
|
||||
the given timeout if no jobs exist) and attempts to claim them, work on
|
||||
those jobs using an executor (potentially blocking further work from being
|
||||
claimed and consumed) and then consume those work units after
|
||||
completion. This process will repeat until the conductor has been stopped
|
||||
or other critical error occurs.
|
||||
|
||||
NOTE(harlowja): consumption occurs even if a engine fails to run due to
|
||||
a atom failure. This is only skipped when an execution failure or
|
||||
a storage failure occurs which are *usually* correctable by re-running on
|
||||
a different conductor (storage failures and execution failures may be
|
||||
transient issues that can be worked around by later execution). If a job
|
||||
after completing can not be consumed or abandoned the conductor relies
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
"""
|
||||
|
||||
LOG = None
|
||||
"""
|
||||
Logger that will be used for listening to events (if none then the module
|
||||
level logger will be used instead).
|
||||
"""
|
||||
|
||||
#: Default timeout used to idle/wait when no jobs have been found.
|
||||
WAIT_TIMEOUT = 0.5
|
||||
|
||||
MAX_SIMULTANEOUS_JOBS = -1
|
||||
"""
|
||||
Default maximum number of jobs that can be in progress at the same time.
|
||||
|
||||
Negative or zero values imply no limit (do note that if a executor is
|
||||
used that is built on a queue, as most are, that this will imply that the
|
||||
queue will contain a potentially large & unfinished backlog of
|
||||
submitted jobs). This *may* get better someday if
|
||||
https://bugs.python.org/issue22737 is ever implemented and released.
|
||||
"""
|
||||
|
||||
#: Exceptions that will **not** cause consumption to occur.
|
||||
NO_CONSUME_EXCEPTIONS = tuple([
|
||||
excp.ExecutionFailure,
|
||||
excp.StorageFailure,
|
||||
])
|
||||
|
||||
_event_factory = threading.Event
|
||||
"""This attribute *can* be overridden by subclasses (for example if
|
||||
an eventlet *green* event works better for the conductor user)."""
|
||||
|
||||
START_FINISH_EVENTS_EMITTED = tuple([
|
||||
'compilation', 'preparation',
|
||||
'validation', 'running',
|
||||
])
|
||||
"""Events will be emitted for the start and finish of each engine
|
||||
activity defined above, the actual event name that can be registered
|
||||
to subscribe to will be ``${event}_start`` and ``${event}_end`` where
|
||||
the ``${event}`` in this pseudo-variable will be one of these events.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None,
|
||||
engine_options=None, wait_timeout=None,
|
||||
log=None, max_simultaneous_jobs=MAX_SIMULTANEOUS_JOBS):
|
||||
super(ExecutorConductor, self).__init__(
|
||||
name, jobboard, persistence=persistence,
|
||||
engine=engine, engine_options=engine_options)
|
||||
self._wait_timeout = _convert_to_timeout(
|
||||
value=wait_timeout, default_value=self.WAIT_TIMEOUT,
|
||||
event_factory=self._event_factory)
|
||||
self._dead = self._event_factory()
|
||||
self._log = misc.pick_first_not_none(log, self.LOG, LOG)
|
||||
self._max_simultaneous_jobs = int(
|
||||
misc.pick_first_not_none(max_simultaneous_jobs,
|
||||
self.MAX_SIMULTANEOUS_JOBS))
|
||||
self._dispatched = set()
|
||||
|
||||
def _executor_factory(self):
|
||||
"""Creates an executor to be used during dispatching."""
|
||||
raise excp.NotImplementedError("This method must be implemented but"
|
||||
" it has not been")
|
||||
|
||||
@removals.removed_kwarg('timeout', version="0.8", removal_version="2.0")
|
||||
def stop(self, timeout=None):
|
||||
"""Requests the conductor to stop dispatching.
|
||||
|
||||
This method can be used to request that a conductor stop its
|
||||
consumption & dispatching loop.
|
||||
|
||||
The method returns immediately regardless of whether the conductor has
|
||||
been stopped.
|
||||
|
||||
.. deprecated:: 0.8
|
||||
|
||||
The ``timeout`` parameter is **deprecated** and is present for
|
||||
backward compatibility **only**. In order to wait for the
|
||||
conductor to gracefully shut down, :py:meth:`wait` should be used
|
||||
instead.
|
||||
"""
|
||||
self._wait_timeout.interrupt()
|
||||
|
||||
@property
|
||||
def dispatching(self):
|
||||
"""Whether or not the dispatching loop is still dispatching."""
|
||||
return not self._dead.is_set()
|
||||
|
||||
def _listeners_from_job(self, job, engine):
|
||||
listeners = super(ExecutorConductor, self)._listeners_from_job(
|
||||
job, engine)
|
||||
listeners.append(logging_listener.LoggingListener(engine,
|
||||
log=self._log))
|
||||
return listeners
|
||||
|
||||
def _dispatch_job(self, job):
|
||||
engine = self._engine_from_job(job)
|
||||
listeners = self._listeners_from_job(job, engine)
|
||||
with ExitStack() as stack:
|
||||
for listener in listeners:
|
||||
stack.enter_context(listener)
|
||||
self._log.debug("Dispatching engine for job '%s'", job)
|
||||
consume = True
|
||||
try:
|
||||
for stage_func, event_name in [(engine.compile, 'compilation'),
|
||||
(engine.prepare, 'preparation'),
|
||||
(engine.validate, 'validation'),
|
||||
(engine.run, 'running')]:
|
||||
self._notifier.notify("%s_start" % event_name, {
|
||||
'job': job,
|
||||
'engine': engine,
|
||||
'conductor': self,
|
||||
})
|
||||
stage_func()
|
||||
self._notifier.notify("%s_end" % event_name, {
|
||||
'job': job,
|
||||
'engine': engine,
|
||||
'conductor': self,
|
||||
})
|
||||
except excp.WrappedFailure as e:
|
||||
if all((f.check(*self.NO_CONSUME_EXCEPTIONS) for f in e)):
|
||||
consume = False
|
||||
if self._log.isEnabledFor(logging.WARNING):
|
||||
if consume:
|
||||
self._log.warn(
|
||||
"Job execution failed (consumption being"
|
||||
" skipped): %s [%s failures]", job, len(e))
|
||||
else:
|
||||
self._log.warn(
|
||||
"Job execution failed (consumption"
|
||||
" proceeding): %s [%s failures]", job, len(e))
|
||||
# Show the failure/s + traceback (if possible)...
|
||||
for i, f in enumerate(e):
|
||||
self._log.warn("%s. %s", i + 1,
|
||||
f.pformat(traceback=True))
|
||||
except self.NO_CONSUME_EXCEPTIONS:
|
||||
self._log.warn("Job execution failed (consumption being"
|
||||
" skipped): %s", job, exc_info=True)
|
||||
consume = False
|
||||
except Exception:
|
||||
self._log.warn(
|
||||
"Job execution failed (consumption proceeding): %s",
|
||||
job, exc_info=True)
|
||||
else:
|
||||
self._log.info("Job completed successfully: %s", job)
|
||||
return consume
|
||||
|
||||
def _try_finish_job(self, job, consume):
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except (excp.JobFailure, excp.NotFound):
|
||||
if consume:
|
||||
self._log.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
self._log.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
|
||||
def _on_job_done(self, job, fut):
|
||||
consume = False
|
||||
try:
|
||||
consume = fut.result()
|
||||
except KeyboardInterrupt:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._log.warn("Job dispatching interrupted: %s", job)
|
||||
except Exception:
|
||||
self._log.warn("Job dispatching failed: %s", job, exc_info=True)
|
||||
try:
|
||||
self._try_finish_job(job, consume)
|
||||
finally:
|
||||
self._dispatched.discard(fut)
|
||||
|
||||
def _can_claim_more_jobs(self, job):
|
||||
if self._wait_timeout.is_stopped():
|
||||
return False
|
||||
if self._max_simultaneous_jobs <= 0:
|
||||
return True
|
||||
if len(self._dispatched) >= self._max_simultaneous_jobs:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def _run_until_dead(self, executor, max_dispatches=None):
|
||||
total_dispatched = 0
|
||||
if max_dispatches is None:
|
||||
# NOTE(TheSriram): if max_dispatches is not set,
|
||||
# then the conductor will run indefinitely, and not
|
||||
# stop after 'n' number of dispatches
|
||||
max_dispatches = -1
|
||||
dispatch_gen = iter_utils.iter_forever(max_dispatches)
|
||||
is_stopped = self._wait_timeout.is_stopped
|
||||
try:
|
||||
# Don't even do any work in the first place...
|
||||
if max_dispatches == 0:
|
||||
raise StopIteration
|
||||
while not is_stopped():
|
||||
any_dispatched = False
|
||||
for job in itertools.takewhile(self._can_claim_more_jobs,
|
||||
self._jobboard.iterjobs()):
|
||||
self._log.debug("Trying to claim job: %s", job)
|
||||
try:
|
||||
self._jobboard.claim(job, self._name)
|
||||
except (excp.UnclaimableJob, excp.NotFound):
|
||||
self._log.debug("Job already claimed or"
|
||||
" consumed: %s", job)
|
||||
else:
|
||||
try:
|
||||
fut = executor.submit(self._dispatch_job, job)
|
||||
except RuntimeError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._log.warn("Job dispatch submitting"
|
||||
" failed: %s", job)
|
||||
self._try_finish_job(job, False)
|
||||
else:
|
||||
fut.job = job
|
||||
self._dispatched.add(fut)
|
||||
any_dispatched = True
|
||||
fut.add_done_callback(
|
||||
functools.partial(self._on_job_done, job))
|
||||
total_dispatched = next(dispatch_gen)
|
||||
if not any_dispatched and not is_stopped():
|
||||
self._wait_timeout.wait()
|
||||
except StopIteration:
|
||||
# This will be raised from 'dispatch_gen' if it reaches its
|
||||
# max dispatch number (which implies we should do no more work).
|
||||
with excutils.save_and_reraise_exception():
|
||||
if max_dispatches >= 0 and total_dispatched >= max_dispatches:
|
||||
self._log.info("Maximum dispatch limit of %s reached",
|
||||
max_dispatches)
|
||||
|
||||
def run(self, max_dispatches=None):
|
||||
self._dead.clear()
|
||||
self._dispatched.clear()
|
||||
try:
|
||||
self._jobboard.register_entity(self.conductor)
|
||||
with self._executor_factory() as executor:
|
||||
self._run_until_dead(executor,
|
||||
max_dispatches=max_dispatches)
|
||||
except StopIteration:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._log.warn("Job dispatching interrupted")
|
||||
finally:
|
||||
self._dead.set()
|
||||
|
||||
# Inherit the docs, so we can reference them in our class docstring,
|
||||
# if we don't do this sphinx gets confused...
|
||||
run.__doc__ = base.Conductor.run.__doc__
|
||||
|
||||
def wait(self, timeout=None):
|
||||
"""Waits for the conductor to gracefully exit.
|
||||
|
||||
This method waits for the conductor to gracefully exit. An optional
|
||||
timeout can be provided, which will cause the method to return
|
||||
within the specified timeout. If the timeout is reached, the returned
|
||||
value will be ``False``, otherwise it will be ``True``.
|
||||
|
||||
:param timeout: Maximum number of seconds that the :meth:`wait` method
|
||||
should block for.
|
||||
"""
|
||||
return self._dead.wait(timeout)
|
69
taskflow/conductors/backends/impl_nonblocking.py
Normal file
69
taskflow/conductors/backends/impl_nonblocking.py
Normal file
@ -0,0 +1,69 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import futurist
|
||||
import six
|
||||
|
||||
from taskflow.conductors.backends import impl_executor
|
||||
from taskflow.utils import threading_utils as tu
|
||||
|
||||
|
||||
class NonBlockingConductor(impl_executor.ExecutorConductor):
|
||||
"""Non-blocking conductor that processes job(s) using a thread executor.
|
||||
|
||||
NOTE(harlowja): A custom executor factory can be provided via keyword
|
||||
argument ``executor_factory``, if provided it will be
|
||||
invoked at
|
||||
:py:meth:`~taskflow.conductors.base.Conductor.run` time
|
||||
with one positional argument (this conductor) and it must
|
||||
return a compatible `executor`_ which can be used
|
||||
to submit jobs to. If ``None`` is a provided a thread pool
|
||||
backed executor is selected by default (it will have
|
||||
an equivalent number of workers as this conductors
|
||||
simultaneous job count).
|
||||
|
||||
.. _executor: https://docs.python.org/dev/library/\
|
||||
concurrent.futures.html#executor-objects
|
||||
"""
|
||||
|
||||
MAX_SIMULTANEOUS_JOBS = tu.get_optimal_thread_count()
|
||||
"""
|
||||
Default maximum number of jobs that can be in progress at the same time.
|
||||
"""
|
||||
|
||||
def _default_executor_factory(self):
|
||||
max_simultaneous_jobs = self._max_simultaneous_jobs
|
||||
if max_simultaneous_jobs <= 0:
|
||||
max_workers = tu.get_optimal_thread_count()
|
||||
else:
|
||||
max_workers = max_simultaneous_jobs
|
||||
return futurist.ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None,
|
||||
engine_options=None, wait_timeout=None,
|
||||
log=None, max_simultaneous_jobs=MAX_SIMULTANEOUS_JOBS,
|
||||
executor_factory=None):
|
||||
super(NonBlockingConductor, self).__init__(
|
||||
name, jobboard,
|
||||
persistence=persistence, engine=engine,
|
||||
engine_options=engine_options, wait_timeout=wait_timeout,
|
||||
log=log, max_simultaneous_jobs=max_simultaneous_jobs)
|
||||
if executor_factory is None:
|
||||
self._executor_factory = self._default_executor_factory
|
||||
else:
|
||||
if not six.callable(executor_factory):
|
||||
raise ValueError("Provided keyword argument 'executor_factory'"
|
||||
" must be callable")
|
||||
self._executor_factory = executor_factory
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import os
|
||||
import threading
|
||||
|
||||
import fasteners
|
||||
@ -20,7 +21,9 @@ import six
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.types import entity
|
||||
from taskflow.types import notifier
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
@ -35,6 +38,9 @@ class Conductor(object):
|
||||
period of time will finish up the prior failed conductors work.
|
||||
"""
|
||||
|
||||
#: Entity kind used when creating new entity objects
|
||||
ENTITY_KIND = 'conductor'
|
||||
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None, engine_options=None):
|
||||
self._name = name
|
||||
@ -48,6 +54,18 @@ class Conductor(object):
|
||||
self._lock = threading.RLock()
|
||||
self._notifier = notifier.Notifier()
|
||||
|
||||
@misc.cachedproperty
|
||||
def conductor(self):
|
||||
"""Entity object that represents this conductor."""
|
||||
hostname = misc.get_hostname()
|
||||
pid = os.getpid()
|
||||
name = '@'.join([self._name, hostname + ":" + str(pid)])
|
||||
metadata = {
|
||||
'hostname': hostname,
|
||||
'pid': pid,
|
||||
}
|
||||
return entity.Entity(self.ENTITY_KIND, name, metadata)
|
||||
|
||||
@property
|
||||
def notifier(self):
|
||||
"""The conductor actions (or other state changes) notifier.
|
||||
@ -134,8 +152,17 @@ class Conductor(object):
|
||||
self._jobboard.close()
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self):
|
||||
"""Continuously claims, runs, and consumes jobs (and repeat)."""
|
||||
def run(self, max_dispatches=None):
|
||||
"""Continuously claims, runs, and consumes jobs (and repeat).
|
||||
|
||||
:param max_dispatches: An upper bound on the number of jobs that will
|
||||
be dispatched, if none or negative this implies
|
||||
there is no limit to the number of jobs that
|
||||
will be dispatched, otherwise if positive this
|
||||
run method will return when that amount of jobs
|
||||
has been dispatched (instead of running
|
||||
forever and/or until stopped).
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _dispatch_job(self, job):
|
||||
|
@ -388,7 +388,12 @@ class JobBoard(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def register_entity(self, entity):
|
||||
"""Register an entity to the jobboard('s backend), e.g: a conductor"""
|
||||
"""Register an entity to the jobboard('s backend), e.g: a conductor.
|
||||
|
||||
:param entity: entity to register as being associated with the
|
||||
jobboard('s backend)
|
||||
:type entity: :py:class:`~taskflow.types.entity.Entity`
|
||||
"""
|
||||
|
||||
@abc.abstractproperty
|
||||
def connected(self):
|
||||
|
@ -18,6 +18,8 @@ import collections
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
import futurist
|
||||
import testscenarios
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow.conductors import backends
|
||||
@ -51,23 +53,39 @@ def test_factory(blowup):
|
||||
return f
|
||||
|
||||
|
||||
def single_factory():
|
||||
return futurist.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
|
||||
ComponentBundle = collections.namedtuple('ComponentBundle',
|
||||
['board', 'client',
|
||||
'persistence', 'conductor'])
|
||||
|
||||
|
||||
class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
KIND = 'blocking'
|
||||
class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
test_utils.EngineTestBase, test.TestCase):
|
||||
scenarios = [
|
||||
('blocking', {'kind': 'blocking',
|
||||
'conductor_kwargs': {'wait_timeout': 0.1}}),
|
||||
('nonblocking_many_thread',
|
||||
{'kind': 'nonblocking', 'conductor_kwargs': {'wait_timeout': 0.1}}),
|
||||
('nonblocking_one_thread', {'kind': 'nonblocking',
|
||||
'conductor_kwargs': {
|
||||
'executor_factory': single_factory,
|
||||
'wait_timeout': 0.1,
|
||||
}})
|
||||
]
|
||||
|
||||
def make_components(self, name='testing', wait_timeout=0.1):
|
||||
def make_components(self):
|
||||
client = fake_client.FakeClient()
|
||||
persistence = impl_memory.MemoryBackend()
|
||||
board = impl_zookeeper.ZookeeperJobBoard(name, {},
|
||||
board = impl_zookeeper.ZookeeperJobBoard('testing', {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
conductor = backends.fetch(self.KIND, name, board,
|
||||
persistence=persistence,
|
||||
wait_timeout=wait_timeout)
|
||||
conductor_kwargs = self.conductor_kwargs.copy()
|
||||
conductor_kwargs['persistence'] = persistence
|
||||
conductor = backends.fetch(self.kind, 'testing', board,
|
||||
**conductor_kwargs)
|
||||
return ComponentBundle(board, client, persistence, conductor)
|
||||
|
||||
def test_connection(self):
|
||||
@ -178,3 +196,29 @@ class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
fd = lb.find(fd.uuid)
|
||||
self.assertIsNotNone(fd)
|
||||
self.assertEqual(st.REVERTED, fd.state)
|
||||
|
||||
|
||||
class NonBlockingExecutorTest(test.TestCase):
|
||||
def test_bad_wait_timeout(self):
|
||||
persistence = impl_memory.MemoryBackend()
|
||||
client = fake_client.FakeClient()
|
||||
board = impl_zookeeper.ZookeeperJobBoard('testing', {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
self.assertRaises(ValueError,
|
||||
backends.fetch,
|
||||
'nonblocking', 'testing', board,
|
||||
persistence=persistence,
|
||||
wait_timeout='testing')
|
||||
|
||||
def test_bad_factory(self):
|
||||
persistence = impl_memory.MemoryBackend()
|
||||
client = fake_client.FakeClient()
|
||||
board = impl_zookeeper.ZookeeperJobBoard('testing', {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
self.assertRaises(ValueError,
|
||||
backends.fetch,
|
||||
'nonblocking', 'testing', board,
|
||||
persistence=persistence,
|
||||
executor_factory='testing')
|
@ -31,11 +31,11 @@ class Timeout(object):
|
||||
This object has the ability to be interrupted before the actual timeout
|
||||
is reached.
|
||||
"""
|
||||
def __init__(self, timeout):
|
||||
def __init__(self, timeout, event_factory=threading.Event):
|
||||
if timeout < 0:
|
||||
raise ValueError("Timeout must be >= 0 and not %s" % (timeout))
|
||||
self._timeout = timeout
|
||||
self._event = threading.Event()
|
||||
self._event = event_factory()
|
||||
|
||||
def interrupt(self):
|
||||
self._event.set()
|
||||
|
@ -22,6 +22,7 @@ import errno
|
||||
import inspect
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import types
|
||||
@ -42,6 +43,7 @@ from taskflow.types import notifier
|
||||
from taskflow.utils import deprecation
|
||||
|
||||
|
||||
UNKNOWN_HOSTNAME = "<unknown>"
|
||||
NUMERIC_TYPES = six.integer_types + (float,)
|
||||
|
||||
# NOTE(imelnikov): regular expression to get scheme from URI,
|
||||
@ -68,6 +70,18 @@ class StringIO(six.StringIO):
|
||||
self.write(linesep)
|
||||
|
||||
|
||||
def get_hostname(unknown_hostname=UNKNOWN_HOSTNAME):
|
||||
"""Gets the machines hostname; if not able to returns an invalid one."""
|
||||
try:
|
||||
hostname = socket.getfqdn()
|
||||
if not hostname:
|
||||
return unknown_hostname
|
||||
else:
|
||||
return hostname
|
||||
except socket.error:
|
||||
return unknown_hostname
|
||||
|
||||
|
||||
def match_type(obj, matchers):
|
||||
"""Matches a given object using the given matchers list/iterable.
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import multiprocessing
|
||||
import threading
|
||||
|
||||
import six
|
||||
@ -35,6 +36,17 @@ def get_ident():
|
||||
return _thread.get_ident()
|
||||
|
||||
|
||||
def get_optimal_thread_count(default=2):
|
||||
"""Try to guess optimal thread count for current system."""
|
||||
try:
|
||||
return multiprocessing.cpu_count() + 1
|
||||
except NotImplementedError:
|
||||
# NOTE(harlowja): apparently may raise so in this case we will
|
||||
# just setup two threads since it's hard to know what else we
|
||||
# should do in this situation.
|
||||
return default
|
||||
|
||||
|
||||
def daemon_thread(target, *args, **kwargs):
|
||||
"""Makes a daemon thread that calls the given target when started."""
|
||||
thread = threading.Thread(target=target, args=args, kwargs=kwargs)
|
||||
|
Loading…
Reference in New Issue
Block a user