From 6c91066c72f3449fc01ab3189c9ddde3b32b570e Mon Sep 17 00:00:00 2001
From: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Date: Thu, 30 Apr 2015 08:56:20 +0200
Subject: [PATCH] rabbit/qpid: simplify the consumer loop

The consumer loop is over engineered, it returns unused return,
iterconsume creates an iterator directly consumed by 'consume' without
special handling, and in some case kombu error callback are called when
the iterator is stopped and log useless error.

And in reality the consumer is always called when limit=1.

This change simplifies that, by removing the loop and removes all
returns stuffs.

Closes bug: #1450336

Change-Id: Ia2cb52c8577b29e74d4d2b0ed0b535102f2d55c7
---
 oslo_messaging/_drivers/amqpdriver.py         |  4 +--
 oslo_messaging/_drivers/impl_qpid.py          | 20 +++--------
 oslo_messaging/_drivers/impl_rabbit.py        | 36 ++++++-------------
 .../tests/drivers/test_impl_rabbit.py         | 10 ++++--
 tests/drivers/test_impl_rabbit.py             | 10 ++++--
 5 files changed, 30 insertions(+), 50 deletions(-)

diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index bbc4b7828..ceeb07810 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -119,7 +119,7 @@ class AMQPListener(base.Listener):
             if self.incoming:
                 return self.incoming.pop(0)
             try:
-                self.conn.consume(limit=1, timeout=timeout)
+                self.conn.consume(timeout=timeout)
             except rpc_common.Timeout:
                 return None
 
@@ -194,7 +194,7 @@ class ReplyWaiter(object):
     def poll(self):
         while not self._thread_exit_event.is_set():
             try:
-                self.conn.consume(limit=1)
+                self.conn.consume()
             except Exception:
                 LOG.exception("Failed to process incoming message, "
                               "retrying...")
diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py
index 487952f9f..3a0066826 100644
--- a/oslo_messaging/_drivers/impl_qpid.py
+++ b/oslo_messaging/_drivers/impl_qpid.py
@@ -652,8 +652,8 @@ class Connection(object):
 
         return self.ensure(_connect_error, _declare_consumer)
 
-    def iterconsume(self, limit=None, timeout=None):
-        """Return an iterator that will consume from all queues/consumers."""
+    def consume(self, timeout=None):
+        """Consume from all queues/consumers."""
 
         timer = rpc_common.DecayingTimer(duration=timeout)
         timer.start()
@@ -675,7 +675,7 @@ class Connection(object):
             while True:
                 if self._consume_loop_stopped:
                     self._consume_loop_stopped = False
-                    raise StopIteration
+                    return
 
                 try:
                     nxt_receiver = self.session.next_receiver(
@@ -692,10 +692,7 @@ class Connection(object):
                 LOG.exception(_("Error processing message. "
                                 "Skipping it."))
 
-        for iteration in itertools.count(0):
-            if limit and iteration >= limit:
-                raise StopIteration
-            yield self.ensure(_error_callback, _consume)
+        self.ensure(_error_callback, _consume)
 
     def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
         """Send to a publisher based on the publisher class."""
@@ -761,15 +758,6 @@ class Connection(object):
         self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
                             exchange_name=exchange_name, retry=retry)
 
-    def consume(self, limit=None, timeout=None):
-        """Consume from all queues/consumers."""
-        it = self.iterconsume(limit=limit, timeout=timeout)
-        while True:
-            try:
-                six.next(it)
-            except StopIteration:
-                return
-
     def stop_consuming(self):
         self._consume_loop_stopped = True
 
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index ee2da428e..1b68a73e4 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -225,7 +225,7 @@ class ConsumerBase(object):
     def consume(self, *args, **kwargs):
         """Actually declare the consumer on the amqp channel.  This will
         start the flow of messages from the queue.  Using the
-        Connection.iterconsume() iterator will process the messages,
+        Connection.consume() will process the messages,
         calling the appropriate callback.
 
         If a callback is specified in kwargs, use that.  Otherwise,
@@ -988,11 +988,8 @@ class Connection(object):
             return self.ensure(_declare_consumer,
                                error_callback=_connect_error)
 
-    def iterconsume(self, limit=None, timeout=None):
-        """Return an iterator that will consume from all queues/consumers.
-
-        NOTE(sileht): Must be called within the connection lock
-        """
+    def consume(self, timeout=None):
+        """Consume from all queues/consumers."""
 
         timer = rpc_common.DecayingTimer(duration=timeout)
         timer.start()
@@ -1023,25 +1020,22 @@ class Connection(object):
                             else min(timeout, self._poll_timeout))
             while True:
                 if self._consume_loop_stopped:
-                    self._consume_loop_stopped = False
-                    raise StopIteration
+                    return
 
                 if self._heartbeat_supported_and_enabled():
                     self.connection.heartbeat_check(
                         rate=self.driver_conf.heartbeat_rate)
                 try:
-                    return self.connection.drain_events(timeout=poll_timeout)
+                    self.connection.drain_events(timeout=poll_timeout)
+                    return
                 except socket.timeout as exc:
                     poll_timeout = timer.check_return(
                         _raise_timeout, exc, maximum=self._poll_timeout)
 
-        for iteration in itertools.count(0):
-            if limit and iteration >= limit:
-                raise StopIteration
-            yield self.ensure(
-                _consume,
-                recoverable_error_callback=_recoverable_error_callback,
-                error_callback=_error_callback)
+        with self._connection_lock:
+            self.ensure(_consume,
+                        recoverable_error_callback=_recoverable_error_callback,
+                        error_callback=_error_callback)
 
     @staticmethod
     def _log_publisher_send_error(topic, exc):
@@ -1137,16 +1131,6 @@ class Connection(object):
         self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
                             exchange_name=exchange_name, retry=retry, **kwargs)
 
-    def consume(self, limit=None, timeout=None):
-        """Consume from all queues/consumers."""
-        with self._connection_lock:
-            it = self.iterconsume(limit=limit, timeout=timeout)
-            while True:
-                try:
-                    six.next(it)
-                except StopIteration:
-                    return
-
     def stop_consuming(self):
         self._consume_loop_stopped = True
 
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 5805c5e71..54ada6f8d 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -173,15 +173,19 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
             heartbeat=0, failover_strategy="shuffle")
 
 
-class TestRabbitIterconsume(test_utils.BaseTestCase):
+class TestRabbitConsume(test_utils.BaseTestCase):
 
-    def test_iterconsume_timeout(self):
+    def test_consume_timeout(self):
         transport = oslo_messaging.get_transport(self.conf,
                                                  'kombu+memory:////')
         self.addCleanup(transport.cleanup)
         deadline = time.time() + 3
         with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
-            conn.iterconsume(timeout=3)
+            # FIXME(sileht): the deadline should be 6 seconds, not 3
+            # consuming with no consumer have never worked
+            # https://bugs.launchpad.net/oslo.messaging/+bug/1450342
+            # conn.consume(timeout=3)
+
             # kombu memory transport doesn't really raise error
             # so just simulate a real driver behavior
             conn.connection.connection.recoverable_channel_errors = (IOError,)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index f534a4ae0..f27fe31dc 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -82,14 +82,18 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
         self.assertEqual(self.url, url)
 
 
-class TestRabbitIterconsume(test_utils.BaseTestCase):
+class TestRabbitConsume(test_utils.BaseTestCase):
 
-    def test_iterconsume_timeout(self):
+    def test_consume_timeout(self):
         transport = messaging.get_transport(self.conf, 'kombu+memory:////')
         self.addCleanup(transport.cleanup)
         deadline = time.time() + 3
         with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
-            conn.iterconsume(timeout=3)
+            # FIXME(sileht): the deadline should be 6 seconds, not 3
+            # consuming with no consumer have never worked
+            # https://bugs.launchpad.net/oslo.messaging/+bug/1450342
+            # conn.consume(timeout=3)
+
             # kombu memory transport doesn't really raise error
             # so just simulate a real driver behavior
             conn.connection.connection.recoverable_channel_errors = (IOError,)