Handle conductor ctrl-c more appropriately

When a conductor program is interrupted via ctrl-c
or equivalent it is much nicer log that that has happened
and to reraise that exception.

This also slightly tweaks the 99 bottles song to make it
even better, by having more pieces/tasks, which makes it
possible to kill the program during each task and see how
the resumption works when a flow is composed of segments.

Change-Id: I5d242eba9a043ef96646ba74ea5928daa0691ed0
This commit is contained in:
Joshua Harlow 2015-06-08 19:16:16 -07:00 committed by Joshua Harlow
parent 4f867db055
commit 40d19c7696
2 changed files with 39 additions and 19 deletions

View File

@ -18,6 +18,7 @@ except ImportError:
from contextlib2 import ExitStack # noqa
from debtcollector import removals
from oslo_utils import excutils
import six
from taskflow.conductors import base
@ -151,6 +152,9 @@ class BlockingConductor(base.Conductor):
consume = False
try:
f = self._dispatch_job(job)
except KeyboardInterrupt:
with excutils.save_and_reraise_exception():
LOG.warn("Job dispatching interrupted: %s", job)
except Exception:
LOG.warn("Job dispatching failed: %s", job,
exc_info=True)

View File

@ -54,33 +54,47 @@ JB_CONF = {
'board': 'zookeeper',
'path': '/taskflow/99-bottles-demo',
}
DB_URI = r"sqlite:////tmp/bottles.db"
PART_DELAY = 1.0
PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
TAKE_DOWN_DELAY = 1.0
PASS_AROUND_DELAY = 3.0
HOW_MANY_BOTTLES = 99
class TakeABottleDownPassItAround(task.Task):
def execute(self, bottles_left):
class TakeABottleDown(task.Task):
def execute(self):
sys.stdout.write('Take one down, ')
time.sleep(PART_DELAY)
sys.stdout.flush()
time.sleep(TAKE_DOWN_DELAY)
class PassItAround(task.Task):
def execute(self):
sys.stdout.write('pass it around, ')
time.sleep(PART_DELAY)
sys.stdout.flush()
time.sleep(PASS_AROUND_DELAY)
class Conclusion(task.Task):
def execute(self, bottles_left):
sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
sys.stdout.flush()
def make_bottles(count):
s = lf.Flow("bottle-song")
for bottle in reversed(list(range(1, count + 1))):
t = TakeABottleDownPassItAround("take-bottle-%s" % bottle,
inject={"bottles_left": bottle - 1})
s.add(t)
take_bottle = TakeABottleDown("take-bottle-%s" % bottle)
pass_it = PassItAround("pass-%s-around" % bottle)
next_bottles = Conclusion("next-bottles-%s" % (bottle - 1),
inject={"bottles_left": bottle - 1})
s.add(take_bottle, pass_it, next_bottles)
return s
def run_conductor():
print("Starting conductor with pid: %s" % ME)
my_name = "conductor-%s" % ME
persist_backend = persistence_backends.fetch(DB_URI)
persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
with contextlib.closing(persist_backend):
with contextlib.closing(persist_backend.get_connection()) as conn:
conn.upgrade()
@ -90,17 +104,18 @@ def run_conductor():
with contextlib.closing(job_backend):
cond = conductor_backends.fetch('blocking', my_name, job_backend,
persistence=persist_backend)
# Run forever, and kill -9 me...
#
# TODO(harlowja): it would be nicer if we could handle
# ctrl-c better...
cond.run()
# Run forever, and kill -9 or ctrl-c me...
try:
cond.run()
finally:
cond.stop()
cond.wait()
def run_poster():
print("Starting poster with pid: %s" % ME)
my_name = "poster-%s" % ME
persist_backend = persistence_backends.fetch(DB_URI)
persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
with contextlib.closing(persist_backend):
with contextlib.closing(persist_backend.get_connection()) as conn:
conn.upgrade()
@ -128,11 +143,12 @@ def run_poster():
def main():
if len(sys.argv) == 1:
sys.stderr.write("%s p|c\n" % os.path.basename(sys.argv[0]))
return
if sys.argv[1] == 'p':
elif sys.argv[1] == 'p':
run_poster()
if sys.argv[1] == 'c':
elif sys.argv[1] == 'c':
run_conductor()
else:
sys.stderr.write("%s p|c\n" % os.path.basename(sys.argv[0]))
if __name__ == '__main__':