Adds a single threaded flow conductor

Creates a new conductor module that can be used
to connect into the jobboard, engine, and persistence
mechanism.

This commit adds in support for a simple conductor
that will run jobs in its own thread and will dispatch
them, and consume/abandon them.

Implements: blueprint generic-flow-conductor

Change-Id: Ic610bc825506db57b0c4364b0fc588b51d453a76
This commit is contained in:
Dan Krause 2014-05-16 09:38:55 -05:00 committed by Joshua Harlow
parent 268d935a0a
commit c386a5f9d4
8 changed files with 408 additions and 0 deletions

View File

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
import six
import taskflow.engines
from taskflow import exceptions as excp
from taskflow.utils import lock_utils
@six.add_metaclass(abc.ABCMeta)
class Conductor(object):
"""Conductors act as entities which extract jobs from a jobboard, assign
there work to some engine (using some desired configuration) and then wait
for that work to complete. If the work fails then they abandon the claimed
work (or if the process they are running in crashes or dies this
abandonment happens automatically) and then another conductor at a later
period of time will finish up the prior failed conductors work.
"""
def __init__(self, name, jobboard, engine_conf, persistence):
self._name = name
self._jobboard = jobboard
self._engine_conf = engine_conf
self._persistence = persistence
self._lock = threading.RLock()
def _engine_from_job(self, job):
try:
flow_uuid = job.details["flow_uuid"]
except (KeyError, TypeError):
raise excp.NotFound("No flow detail uuid found in job")
else:
try:
flow_detail = job.book.find(flow_uuid)
except (TypeError, AttributeError):
flow_detail = None
if flow_detail is None:
raise excp.NotFound("No matching flow detail found in"
" job for flow detail uuid %s" % flow_uuid)
try:
store = dict(job.details["store"])
except (KeyError, TypeError):
store = {}
return taskflow.engines.load_from_detail(
flow_detail,
store=store,
engine_conf=dict(self._engine_conf),
backend=self._persistence)
@lock_utils.locked
def connect(self):
"""Ensures the jobboard is connected (noop if it is already)."""
if not self._jobboard.connected:
self._jobboard.connect()
@lock_utils.locked
def close(self):
"""Closes the jobboard, disallowing further use."""
self._jobboard.close()
@abc.abstractmethod
def run(self):
"""Continuously claims, runs, and consumes jobs, and waits for more
jobs when there are none left on the jobboard.
"""
@abc.abstractmethod
def _dispatch_job(self, job):
"""Accepts a single (already claimed) job and causes it to be run in
an engine. The job is consumed upon completion (unless False is
returned which will signify the job should be abandoned instead)
:param job: A Job instance that has already been claimed by the
jobboard.
"""

View File

@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import six
from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow.utils import lock_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
WAIT_TIMEOUT = 0.5
NO_CONSUME_EXCEPTIONS = tuple([
excp.ExecutionFailure,
excp.StorageFailure,
])
class SingleThreadedConductor(base.Conductor):
"""A conductor that runs jobs in its own dispatching loop.
This conductor iterates over jobs in the provided jobboard (waiting for
the given timeout if no jobs exist) and attempts to claim them, work on
those jobs in its local thread (blocking further work from being claimed
and consumed) and then consume those work units after completetion. This
process will repeat until the conductor has been stopped or other critical
error occurs.
NOTE(harlowja): consumption occurs even if a engine fails to run due to
a task failure. This is only skipped when an execution failure or
a storage failure occurs which are *usually* correctable by re-running on
a different conductor (storage failures and execution failures may be
transient issues that can be worked around by later execution). If a job
after completing can not be consumed or abandoned the conductor relies
upon the jobboard capabilities to automatically abandon these jobs.
"""
def __init__(self, name, jobboard, engine_conf, persistence,
wait_timeout=None):
super(SingleThreadedConductor, self).__init__(name, jobboard,
engine_conf,
persistence)
if wait_timeout is None:
wait_timeout = WAIT_TIMEOUT
if isinstance(wait_timeout, (int, float) + six.string_types):
self._wait_timeout = misc.Timeout(float(wait_timeout))
elif isinstance(wait_timeout, misc.Timeout):
self._wait_timeout = wait_timeout
else:
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
self._dead = threading.Event()
@lock_utils.locked
def stop(self, timeout=None):
"""Stops dispatching and returns whether the dispatcher loop is active
or whether it has ceased. If a timeout is provided the dispatcher
loop may not have ceased by the timeout reached (the request to cease
will be honored in the future).
"""
self._wait_timeout.interrupt()
self._dead.wait(timeout)
return self.dispatching
@property
def dispatching(self):
if self._dead.is_set():
return False
return True
def _dispatch_job(self, job):
LOG.info("Dispatching job: %s", job)
try:
engine = self._engine_from_job(job)
except Exception as e:
raise excp.ConductorFailure("Failed creating an engine", cause=e)
with logging_listener.LoggingListener(engine, log=LOG):
consume = True
try:
engine.run()
except excp.WrappedFailure as e:
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
LOG.warn("Job execution failed (consumption being"
" skipped): %s", job, exc_info=True)
consume = False
else:
LOG.warn("Job execution failed: %s", job, exc_info=True)
except NO_CONSUME_EXCEPTIONS:
LOG.warn("Job execution failed (consumption being"
" skipped): %s", job, exc_info=True)
consume = False
except Exception:
LOG.warn("Job execution failed: %s", job, exc_info=True)
else:
LOG.info("Job completed successfully: %s", job)
return consume
def run(self):
self._dead.clear()
try:
while True:
if self._wait_timeout.is_stopped():
break
dispatched = 0
for job in self._jobboard.iterjobs():
if self._wait_timeout.is_stopped():
break
LOG.debug("Trying to claim job: %s", job)
try:
self._jobboard.claim(job, self._name)
except (excp.UnclaimableJob, excp.NotFound):
LOG.debug("Job already claimed or consumed: %s", job)
continue
dispatched += 1
try:
consume = self._dispatch_job(job)
except excp.ConductorFailure:
LOG.warn("Job dispatching failed: %s", job,
exc_info=True)
else:
try:
if consume:
self._jobboard.consume(job, self._name)
else:
self._jobboard.abandon(job, self._name)
except excp.JobFailure:
if consume:
LOG.warn("Failed job consumption: %s", job,
exc_info=True)
else:
LOG.warn("Failed job abandonment: %s", job,
exc_info=True)
if dispatched == 0 and not self._wait_timeout.is_stopped():
self._wait_timeout.wait()
finally:
self._dead.set()

View File

@ -61,6 +61,12 @@ class StorageFailure(TaskFlowException):
"""Raised when storage backends can not be read/saved/deleted."""
# Conductor related errors.
class ConductorFailure(TaskFlowException):
"""Errors related to conducting activities."""
# Job related errors.
class JobFailure(TaskFlowException):

View File

@ -99,3 +99,8 @@ class Job(object):
def name(self):
"""The non-uniquely identifying name of this job."""
return self._name
def __str__(self):
"""Pretty formats the job into something *more* meaningful."""
return "%s %s (%s): %s" % (type(self).__name__,
self.name, self.uuid, self.details)

View File

@ -154,6 +154,10 @@ class JobBoard(object):
this must be the same name that was used for claiming this job.
"""
@abc.abstractproperty
def connected(self):
"""Returns if this jobboard is connected."""
@abc.abstractmethod
def connect(self):
"""Opens the connection to any backend system."""

View File

@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import threading
from zake import fake_client
from taskflow.conductors import single_threaded as stc
from taskflow import engines
from taskflow.jobs.backends import impl_zookeeper
from taskflow.jobs import jobboard
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow import states as st
from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as pu
@contextlib.contextmanager
def close_many(*closeables):
try:
yield
finally:
for c in closeables:
c.close()
def test_factory(blowup):
f = lf.Flow("test")
if not blowup:
f.add(test_utils.SaveOrderTask('test1'))
else:
f.add(test_utils.FailingTask("test1"))
return f
def make_thread(conductor):
t = threading.Thread(target=conductor.run)
t.daemon = True
return t
class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
def make_components(self, name='testing', wait_timeout=0.1):
client = fake_client.FakeClient()
persistence = impl_memory.MemoryBackend()
board = impl_zookeeper.ZookeeperJobBoard(name, {},
client=client,
persistence=persistence)
engine_conf = {
'engine': 'default',
}
conductor = stc.SingleThreadedConductor(name, board, engine_conf,
persistence, wait_timeout)
return misc.AttrDict(board=board,
client=client,
persistence=persistence,
conductor=conductor)
def test_connection(self):
components = self.make_components()
components.conductor.connect()
with close_many(components.conductor, components.client):
self.assertTrue(components.board.connected)
self.assertTrue(components.client.connected)
self.assertFalse(components.board.connected)
self.assertFalse(components.client.connected)
def test_run_empty(self):
components = self.make_components()
components.conductor.connect()
with close_many(components.conductor, components.client):
t = make_thread(components.conductor)
t.start()
self.assertFalse(components.conductor.stop(0.5))
t.join()
def test_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
components.board.notifier.register(jobboard.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = make_thread(components.conductor)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_factory,
[False], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
consumed_event.wait(1.0)
self.assertTrue(consumed_event.is_set())
components.conductor.stop(1.0)
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertEqual(st.SUCCESS, fd.state)
def test_fail_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
components.board.notifier.register(jobboard.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = make_thread(components.conductor)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_factory,
[True], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
consumed_event.wait(1.0)
self.assertTrue(consumed_event.is_set())
components.conductor.stop(1.0)
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertEqual(st.REVERTED, fd.state)