Notify on the individual engine steps

When a conductor is running it is quite useful to be able to how
long each engine step takes. To enable this information being
output, add a notifier to the base conductor and use it in the
blocking conductor to emit events around engine activities. This
makes it possible to track the timing (or other information that
can be gathered from these events) in a non-intrusive manner.

In the `99_bottles.py` demo we also now use this to be able to
easily see what the conductor is actively doing (without having
to enable the more verbose DEBUG level logging).

Change-Id: Ifd8ff38f82fc8135fe5fec4c8e41f0e06f4fdee3
This commit is contained in:
Joshua Harlow 2015-06-08 22:16:47 -07:00 committed by Min Pae
parent 40d19c7696
commit 63c6730248
3 changed files with 80 additions and 7 deletions

View File

@ -56,6 +56,16 @@ class BlockingConductor(base.Conductor):
upon the jobboard capabilities to automatically abandon these jobs. upon the jobboard capabilities to automatically abandon these jobs.
""" """
START_FINISH_EVENTS_EMITTED = tuple([
'compilation', 'preparation',
'validation', 'running',
])
"""Events will be emitted for the start and finish of each engine
activity defined above, the actual event name that can be registered
to subscribe to will be ``${event}_start`` and ``${event}_end`` where
the ``${event}`` in this pseudo-variable will be one of these events.
"""
def __init__(self, name, jobboard, def __init__(self, name, jobboard,
persistence=None, engine=None, persistence=None, engine=None,
engine_options=None, wait_timeout=None): engine_options=None, wait_timeout=None):
@ -105,10 +115,24 @@ class BlockingConductor(base.Conductor):
with ExitStack() as stack: with ExitStack() as stack:
for listener in listeners: for listener in listeners:
stack.enter_context(listener) stack.enter_context(listener)
LOG.debug("Dispatching engine %s for job: %s", engine, job) LOG.debug("Dispatching engine for job '%s'", job)
consume = True consume = True
try: try:
engine.run() for stage_func, event_name in [(engine.compile, 'compilation'),
(engine.prepare, 'preparation'),
(engine.validate, 'validation'),
(engine.run, 'running')]:
self._notifier.notify("%s_start" % event_name, {
'job': job,
'engine': engine,
'conductor': self,
})
stage_func()
self._notifier.notify("%s_end" % event_name, {
'job': job,
'engine': engine,
'conductor': self,
})
except excp.WrappedFailure as e: except excp.WrappedFailure as e:
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
consume = False consume = False

View File

@ -20,6 +20,7 @@ import six
from taskflow import engines from taskflow import engines
from taskflow import exceptions as excp from taskflow import exceptions as excp
from taskflow.types import notifier
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
@ -45,6 +46,18 @@ class Conductor(object):
self._engine_options = engine_options.copy() self._engine_options = engine_options.copy()
self._persistence = persistence self._persistence = persistence
self._lock = threading.RLock() self._lock = threading.RLock()
self._notifier = notifier.Notifier()
@property
def notifier(self):
"""The conductor actions (or other state changes) notifier.
NOTE(harlowja): different conductor implementations may emit
different events + event details at different times, so refer to your
conductor documentation to know exactly what can and what can not be
subscribed to.
"""
return self._notifier
def _flow_detail_from_job(self, job): def _flow_detail_from_job(self, job):
"""Extracts a flow detail from a job (via some manner). """Extracts a flow detail from a job (via some manner).

View File

@ -34,6 +34,7 @@ from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends as persistence_backends from taskflow.persistence import backends as persistence_backends
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow import task from taskflow import task
from taskflow.types import timing
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -61,10 +62,11 @@ HOW_MANY_BOTTLES = 99
class TakeABottleDown(task.Task): class TakeABottleDown(task.Task):
def execute(self): def execute(self, bottles_left):
sys.stdout.write('Take one down, ') sys.stdout.write('Take one down, ')
sys.stdout.flush() sys.stdout.flush()
time.sleep(TAKE_DOWN_DELAY) time.sleep(TAKE_DOWN_DELAY)
return bottles_left - 1
class PassItAround(task.Task): class PassItAround(task.Task):
@ -82,16 +84,49 @@ class Conclusion(task.Task):
def make_bottles(count): def make_bottles(count):
s = lf.Flow("bottle-song") s = lf.Flow("bottle-song")
for bottle in reversed(list(range(1, count + 1))):
take_bottle = TakeABottleDown("take-bottle-%s" % bottle) take_bottle = TakeABottleDown("take-bottle-%s" % count,
inject={'bottles_left': count},
provides='bottles_left')
pass_it = PassItAround("pass-%s-around" % count)
next_bottles = Conclusion("next-bottles-%s" % (count - 1))
s.add(take_bottle, pass_it, next_bottles)
for bottle in reversed(list(range(1, count))):
take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
provides='bottles_left')
pass_it = PassItAround("pass-%s-around" % bottle) pass_it = PassItAround("pass-%s-around" % bottle)
next_bottles = Conclusion("next-bottles-%s" % (bottle - 1), next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
inject={"bottles_left": bottle - 1})
s.add(take_bottle, pass_it, next_bottles) s.add(take_bottle, pass_it, next_bottles)
return s return s
def run_conductor(): def run_conductor():
event_watches = {}
# This will be triggered by the conductor doing various activities
# with engines, and is quite nice to be able to see the various timing
# segments (which is useful for debugging, or watching, or figuring out
# where to optimize).
def on_conductor_event(event, details):
print("Event '%s' has been received..." % event)
print("Details = %s" % details)
if event.endswith("_start"):
w = timing.StopWatch()
w.start()
base_event = event[0:-len("_start")]
event_watches[base_event] = w
if event.endswith("_end"):
base_event = event[0:-len("_end")]
try:
w = event_watches.pop(base_event)
w.stop()
print("It took %0.3f seconds for event '%s' to finish"
% (w.elapsed(), base_event))
except KeyError:
pass
print("Starting conductor with pid: %s" % ME) print("Starting conductor with pid: %s" % ME)
my_name = "conductor-%s" % ME my_name = "conductor-%s" % ME
persist_backend = persistence_backends.fetch(PERSISTENCE_URI) persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
@ -104,6 +139,7 @@ def run_conductor():
with contextlib.closing(job_backend): with contextlib.closing(job_backend):
cond = conductor_backends.fetch('blocking', my_name, job_backend, cond = conductor_backends.fetch('blocking', my_name, job_backend,
persistence=persist_backend) persistence=persist_backend)
cond.notifier.register(cond.notifier.ANY, on_conductor_event)
# Run forever, and kill -9 or ctrl-c me... # Run forever, and kill -9 or ctrl-c me...
try: try:
cond.run() cond.run()