diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index fb8a3c3a7..945f591bd 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -18,6 +18,7 @@ except ImportError: from contextlib2 import ExitStack # noqa from debtcollector import removals +from oslo_utils import excutils import six from taskflow.conductors import base @@ -151,6 +152,9 @@ class BlockingConductor(base.Conductor): consume = False try: f = self._dispatch_job(job) + except KeyboardInterrupt: + with excutils.save_and_reraise_exception(): + LOG.warn("Job dispatching interrupted: %s", job) except Exception: LOG.warn("Job dispatching failed: %s", job, exc_info=True) diff --git a/taskflow/examples/99_bottles.py b/taskflow/examples/99_bottles.py index 9959255b7..90894e9c9 100644 --- a/taskflow/examples/99_bottles.py +++ b/taskflow/examples/99_bottles.py @@ -54,33 +54,47 @@ JB_CONF = { 'board': 'zookeeper', 'path': '/taskflow/99-bottles-demo', } -DB_URI = r"sqlite:////tmp/bottles.db" -PART_DELAY = 1.0 +PERSISTENCE_URI = r"sqlite:////tmp/bottles.db" +TAKE_DOWN_DELAY = 1.0 +PASS_AROUND_DELAY = 3.0 HOW_MANY_BOTTLES = 99 -class TakeABottleDownPassItAround(task.Task): - def execute(self, bottles_left): +class TakeABottleDown(task.Task): + def execute(self): sys.stdout.write('Take one down, ') - time.sleep(PART_DELAY) + sys.stdout.flush() + time.sleep(TAKE_DOWN_DELAY) + + +class PassItAround(task.Task): + def execute(self): sys.stdout.write('pass it around, ') - time.sleep(PART_DELAY) + sys.stdout.flush() + time.sleep(PASS_AROUND_DELAY) + + +class Conclusion(task.Task): + def execute(self, bottles_left): sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left) + sys.stdout.flush() def make_bottles(count): s = lf.Flow("bottle-song") for bottle in reversed(list(range(1, count + 1))): - t = TakeABottleDownPassItAround("take-bottle-%s" % bottle, - inject={"bottles_left": bottle - 1}) - s.add(t) + take_bottle = TakeABottleDown("take-bottle-%s" % bottle) + pass_it = PassItAround("pass-%s-around" % bottle) + next_bottles = Conclusion("next-bottles-%s" % (bottle - 1), + inject={"bottles_left": bottle - 1}) + s.add(take_bottle, pass_it, next_bottles) return s def run_conductor(): print("Starting conductor with pid: %s" % ME) my_name = "conductor-%s" % ME - persist_backend = persistence_backends.fetch(DB_URI) + persist_backend = persistence_backends.fetch(PERSISTENCE_URI) with contextlib.closing(persist_backend): with contextlib.closing(persist_backend.get_connection()) as conn: conn.upgrade() @@ -90,17 +104,18 @@ def run_conductor(): with contextlib.closing(job_backend): cond = conductor_backends.fetch('blocking', my_name, job_backend, persistence=persist_backend) - # Run forever, and kill -9 me... - # - # TODO(harlowja): it would be nicer if we could handle - # ctrl-c better... - cond.run() + # Run forever, and kill -9 or ctrl-c me... + try: + cond.run() + finally: + cond.stop() + cond.wait() def run_poster(): print("Starting poster with pid: %s" % ME) my_name = "poster-%s" % ME - persist_backend = persistence_backends.fetch(DB_URI) + persist_backend = persistence_backends.fetch(PERSISTENCE_URI) with contextlib.closing(persist_backend): with contextlib.closing(persist_backend.get_connection()) as conn: conn.upgrade() @@ -128,11 +143,12 @@ def run_poster(): def main(): if len(sys.argv) == 1: sys.stderr.write("%s p|c\n" % os.path.basename(sys.argv[0])) - return - if sys.argv[1] == 'p': + elif sys.argv[1] == 'p': run_poster() - if sys.argv[1] == 'c': + elif sys.argv[1] == 'c': run_conductor() + else: + sys.stderr.write("%s p|c\n" % os.path.basename(sys.argv[0])) if __name__ == '__main__':