From 15aa5cbda810ef3f757e9e54280fd8216dc9ef7d Mon Sep 17 00:00:00 2001
From: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Date: Mon, 8 Dec 2014 10:52:45 +0100
Subject: [PATCH] The executor doesn't need to set the timeout

It's up to the driver to set a suitable timeout for polling the broker,
this one can be different that the one requested by the driver
caller as long as the caller timeout is respected.

This change also adds a new driver listener API, to be able
to stop it cleanly, specially in case of timeout=None.

Closes bug: #1400268
Closes bug: #1399257
Change-Id: I674c0def1efb420c293897d49683593a0b10e291
---
 oslo/messaging/_drivers/amqpdriver.py      | 26 +++++++++-------------
 oslo/messaging/_drivers/base.py            |  8 ++++++-
 oslo/messaging/_drivers/impl_fake.py       |  6 ++++-
 oslo/messaging/_drivers/impl_qpid.py       | 13 +++++++++++
 oslo/messaging/_drivers/impl_rabbit.py     | 11 +++++++++
 oslo/messaging/_executors/base.py          |  4 ----
 oslo/messaging/_executors/impl_blocking.py |  3 ++-
 oslo/messaging/_executors/impl_eventlet.py |  3 ++-
 tests/executors/test_executor.py           | 14 ++++--------
 9 files changed, 54 insertions(+), 34 deletions(-)

diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 41129e7c2..6e1451807 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -17,7 +17,6 @@ __all__ = ['AMQPDriverBase']
 
 import logging
 import threading
-import time
 import uuid
 
 from six import moves
@@ -94,6 +93,7 @@ class AMQPListener(base.Listener):
         self.conn = conn
         self.msg_id_cache = rpc_amqp._MsgIdCache()
         self.incoming = []
+        self._stopped = threading.Event()
 
     def __call__(self, message):
         # FIXME(markmc): logging isn't driver specific
@@ -110,23 +110,17 @@ class AMQPListener(base.Listener):
                                                  ctxt.reply_q))
 
     def poll(self, timeout=None):
-        if timeout is not None:
-            deadline = time.time() + timeout
-        else:
-            deadline = None
-        while True:
+        while not self._stopped.is_set():
             if self.incoming:
                 return self.incoming.pop(0)
-            if deadline is not None:
-                timeout = deadline - time.time()
-                if timeout < 0:
-                    return None
-                try:
-                    self.conn.consume(limit=1, timeout=timeout)
-                except rpc_common.Timeout:
-                    return None
-            else:
-                self.conn.consume(limit=1)
+            try:
+                self.conn.consume(limit=1, timeout=timeout)
+            except rpc_common.Timeout:
+                return None
+
+    def stop(self):
+        self._stopped.set()
+        self.conn.stop_consuming()
 
     def cleanup(self):
         # Closes listener connection
diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py
index ec24460f5..ffaebe206 100644
--- a/oslo/messaging/_drivers/base.py
+++ b/oslo/messaging/_drivers/base.py
@@ -56,9 +56,15 @@ class Listener(object):
     def poll(self, timeout=None):
         """Blocking until a message is pending and return IncomingMessage.
         Return None after timeout seconds if timeout is set and no message is
-        ending.
+        ending or if the listener have been stopped.
         """
 
+    def stop(self):
+        """Stop listener.
+        Stop the listener message polling
+        """
+        pass
+
     def cleanup(self):
         """Cleanup listener.
         Close connection used by listener if any. For some listeners like
diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py
index 457ef0a22..1a7e87a7b 100644
--- a/oslo/messaging/_drivers/impl_fake.py
+++ b/oslo/messaging/_drivers/impl_fake.py
@@ -46,6 +46,7 @@ class FakeListener(base.Listener):
         self._exchange_manager = exchange_manager
         self._targets = targets
         self._pool = pool
+        self._stopped = threading.Event()
 
         # NOTE(sileht): Ensure that all needed queues exists even the listener
         # have not been polled yet
@@ -58,7 +59,7 @@ class FakeListener(base.Listener):
             deadline = time.time() + timeout
         else:
             deadline = None
-        while True:
+        while not self._stopped.is_set():
             for target in self._targets:
                 exchange = self._exchange_manager.get_exchange(target.exchange)
                 (ctxt, message, reply_q, requeue) = exchange.poll(target,
@@ -77,6 +78,9 @@ class FakeListener(base.Listener):
             time.sleep(pause)
         return None
 
+    def stop(self):
+        self._stopped.set()
+
 
 class FakeExchange(object):
 
diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py
index 43c7f57e3..8c9b86d8c 100644
--- a/oslo/messaging/_drivers/impl_qpid.py
+++ b/oslo/messaging/_drivers/impl_qpid.py
@@ -460,6 +460,8 @@ class Connection(object):
         self.consumers = {}
         self.conf = conf
 
+        self._consume_loop_stopped = False
+
         self.brokers_params = []
         if url.hosts:
             for host in url.hosts:
@@ -651,8 +653,16 @@ class Connection(object):
             LOG.exception(_('Failed to consume message from queue: %s'), exc)
 
         def _consume():
+            # NOTE(sileht):
+            # maximun value choosen according the best practice from kombu:
+            # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
             poll_timeout = 1 if timeout is None else min(timeout, 1)
+
             while True:
+                if self._consume_loop_stopped:
+                    self._consume_loop_stopped = False
+                    raise StopIteration
+
                 try:
                     nxt_receiver = self.session.next_receiver(
                         timeout=poll_timeout)
@@ -745,6 +755,9 @@ class Connection(object):
             except StopIteration:
                 return
 
+    def stop_consuming(self):
+        self._consume_loop_stopped = True
+
 
 class QpidDriver(amqpdriver.AMQPDriverBase):
 
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 05219ffcd..4d9d7c893 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -497,6 +497,7 @@ class Connection(object):
         self._initial_pid = os.getpid()
 
         self.do_consume = True
+        self._consume_loop_stopped = False
 
         self.channel = None
         self.connection = kombu.connection.Connection(
@@ -715,8 +716,15 @@ class Connection(object):
                 queues_tail.consume(nowait=False)
                 self.do_consume = False
 
+            # NOTE(sileht):
+            # maximun value choosen according the best practice from kombu:
+            # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
             poll_timeout = 1 if timeout is None else min(timeout, 1)
             while True:
+                if self._consume_loop_stopped:
+                    self._consume_loop_stopped = False
+                    raise StopIteration
+
                 try:
                     return self.connection.drain_events(timeout=poll_timeout)
                 except socket.timeout as exc:
@@ -790,6 +798,9 @@ class Connection(object):
             except StopIteration:
                 return
 
+    def stop_consuming(self):
+        self._consume_loop_stopped = True
+
 
 class RabbitDriver(amqpdriver.AMQPDriverBase):
 
diff --git a/oslo/messaging/_executors/base.py b/oslo/messaging/_executors/base.py
index 095394ffb..8019017b3 100644
--- a/oslo/messaging/_executors/base.py
+++ b/oslo/messaging/_executors/base.py
@@ -16,10 +16,6 @@ import abc
 
 import six
 
-# NOTE(sileht): value choosen according the best practice from kombu
-# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
-POLL_TIMEOUT = 1
-
 
 @six.add_metaclass(abc.ABCMeta)
 class ExecutorBase(object):
diff --git a/oslo/messaging/_executors/impl_blocking.py b/oslo/messaging/_executors/impl_blocking.py
index d659482f8..1b039b991 100644
--- a/oslo/messaging/_executors/impl_blocking.py
+++ b/oslo/messaging/_executors/impl_blocking.py
@@ -42,7 +42,7 @@ class BlockingExecutor(base.ExecutorBase):
         self._running = True
         while self._running:
             try:
-                incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
+                incoming = self.listener.poll()
                 if incoming is not None:
                     with self.dispatcher(incoming) as callback:
                         callback()
@@ -51,6 +51,7 @@ class BlockingExecutor(base.ExecutorBase):
 
     def stop(self):
         self._running = False
+        self.listener.stop()
 
     def wait(self):
         pass
diff --git a/oslo/messaging/_executors/impl_eventlet.py b/oslo/messaging/_executors/impl_eventlet.py
index 13eeeb108..c3391675a 100644
--- a/oslo/messaging/_executors/impl_eventlet.py
+++ b/oslo/messaging/_executors/impl_eventlet.py
@@ -85,7 +85,7 @@ class EventletExecutor(base.ExecutorBase):
         def _executor_thread():
             try:
                 while self._running:
-                    incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
+                    incoming = self.listener.poll()
                     if incoming is not None:
                         spawn_with(ctxt=self.dispatcher(incoming),
                                    pool=self._greenpool)
@@ -99,6 +99,7 @@ class EventletExecutor(base.ExecutorBase):
         if self._thread is None:
             return
         self._running = False
+        self.listener.stop()
         self._thread.cancel()
 
     def wait(self):
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
index ef247c288..445e2283b 100644
--- a/tests/executors/test_executor.py
+++ b/tests/executors/test_executor.py
@@ -39,12 +39,10 @@ class TestExecutor(test_utils.BaseTestCase):
 
     @classmethod
     def generate_scenarios(cls):
-        impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor,
-                                  stop_before_return=True))]
+        impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor))]
         if impl_eventlet is not None:
             impl.append(
-                ('eventlet', dict(executor=impl_eventlet.EventletExecutor,
-                                  stop_before_return=False)))
+                ('eventlet', dict(executor=impl_eventlet.EventletExecutor)))
         cls.scenarios = testscenarios.multiply_scenarios(impl)
 
     @staticmethod
@@ -72,13 +70,9 @@ class TestExecutor(test_utils.BaseTestCase):
                                           message={'payload': 'data'})
 
         def fake_poll(timeout=None):
-            if self.stop_before_return:
-                executor.stop()
+            if listener.poll.call_count == 1:
                 return incoming_message
-            else:
-                if listener.poll.call_count == 1:
-                    return incoming_message
-                executor.stop()
+            executor.stop()
 
         listener.poll.side_effect = fake_poll