From b737a92afdef3e08bca7e9ac6ebbed0647e966fd Mon Sep 17 00:00:00 2001
From: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Date: Thu, 30 Apr 2015 17:46:42 +0200
Subject: [PATCH] rabbit: remove unused consumer interfaces

The consumer code is over engineered, it allows to override
everything, but the override is always done with functools.partial.

None of the child Class have the same signature, sometimes
the constructor use the parameter name as the parent class but for
a different purpose, that makes the code hard to read.

It's was never clear which options is passed to the queue and the
exchange at this end to kombu.

This changes removes all of that stuffs, and only use the kombu
terminology for consumer parameters.

Alse we don't hardcode anymore the tag and the channel in the consumer
class, to allow to change them without recreating a consumer object
in the futur.

Change-Id: Ie341f0c973adbda9a342cb836867345aa42652d1
---
 oslo_messaging/_drivers/impl_rabbit.py | 267 +++++++++----------------
 1 file changed, 92 insertions(+), 175 deletions(-)

diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 5dfe3ab1e..1ae65af0d 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -13,8 +13,6 @@
 #    under the License.
 
 import contextlib
-import functools
-import itertools
 import logging
 import os
 import socket
@@ -171,30 +169,42 @@ class RabbitMessage(dict):
         self._raw_message.requeue()
 
 
-class ConsumerBase(object):
-    """Consumer base class."""
+class Consumer(object):
+    """Consumer class."""
 
-    def __init__(self, channel, callback, tag, **kwargs):
-        """Declare a queue on an amqp channel.
-
-        'channel' is the amqp channel to use
-        'callback' is the callback to call when messages are received
-        'tag' is a unique ID for the consumer on the channel
-
-        queue name, exchange name, and other kombu options are
-        passed in here as a dictionary.
+    def __init__(self, conf, exchange_name, queue_name, routing_key, type,
+                 durable, auto_delete, callback, nowait=True):
+        """Init the Publisher class with the exchange_name, routing_key,
+        type, durable auto_delete
         """
+        self.queue_name = queue_name
+        self.exchange_name = exchange_name
+        self.routing_key = routing_key
+        self.auto_delete = auto_delete
+        self.durable = durable
         self.callback = callback
-        self.tag = six.text_type(tag)
-        self.kwargs = kwargs
-        self.queue = None
-        self.reconnect(channel)
+        self.type = type
+        self.nowait = nowait
+        self.queue_arguments = _get_queue_arguments(conf)
+
+        self.queue = None
+        self.exchange = kombu.entity.Exchange(
+            name=exchange_name,
+            type=type,
+            durable=self.durable,
+            auto_delete=self.auto_delete)
+
+    def declare(self, channel):
+        """Re-declare the queue after a rabbit (re)connect."""
+        self.queue = kombu.entity.Queue(
+            name=self.queue_name,
+            channel=channel,
+            exchange=self.exchange,
+            durable=self.durable,
+            auto_delete=self.auto_delete,
+            routing_key=self.routing_key,
+            queue_arguments=self.queue_arguments)
 
-    def reconnect(self, channel):
-        """Re-declare the queue after a rabbit reconnect."""
-        self.channel = channel
-        self.kwargs['channel'] = channel
-        self.queue = kombu.entity.Queue(**self.kwargs)
         try:
             self.queue.declare()
         except Exception as e:
@@ -210,149 +220,34 @@ class ConsumerBase(object):
             LOG.error(_("Declaring queue failed with (%s), retrying"), e)
             self.queue.declare()
 
-    def _callback_handler(self, message, callback):
+    def consume(self, tag):
+        """Actually declare the consumer on the amqp channel.  This will
+        start the flow of messages from the queue.  Using the
+        Connection.consume() will process the messages,
+        calling the appropriate callback.
+        """
+
+        self.queue.consume(callback=self._callback,
+                           consumer_tag=six.text_type(tag),
+                           nowait=self.nowait)
+
+    def _callback(self, message):
         """Call callback with deserialized message.
 
         Messages that are processed and ack'ed.
         """
 
+        m2p = getattr(self.queue.channel, 'message_to_python', None)
+        if m2p:
+            message = m2p(message)
+
         try:
-            callback(RabbitMessage(message))
+            self.callback(RabbitMessage(message))
         except Exception:
             LOG.exception(_("Failed to process message"
                             " ... skipping it."))
             message.ack()
 
-    def consume(self, *args, **kwargs):
-        """Actually declare the consumer on the amqp channel.  This will
-        start the flow of messages from the queue.  Using the
-        Connection.consume() will process the messages,
-        calling the appropriate callback.
-
-        If a callback is specified in kwargs, use that.  Otherwise,
-        use the callback passed during __init__()
-
-        If kwargs['nowait'] is True, then this call will block until
-        a message is read.
-
-        """
-
-        options = {'consumer_tag': self.tag}
-        options['nowait'] = kwargs.get('nowait', False)
-        callback = kwargs.get('callback', self.callback)
-        if not callback:
-            raise ValueError("No callback defined")
-
-        def _callback(message):
-            m2p = getattr(self.channel, 'message_to_python', None)
-            if m2p:
-                message = m2p(message)
-            self._callback_handler(message, callback)
-
-        self.queue.consume(*args, callback=_callback, **options)
-
-
-class DirectConsumer(ConsumerBase):
-    """Queue/consumer class for 'direct'."""
-
-    def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
-        """Init a 'direct' queue.
-
-        'channel' is the amqp channel to use
-        'msg_id' is the msg_id to listen on
-        'callback' is the callback to call when messages are received
-        'tag' is a unique ID for the consumer on the channel
-
-        Other kombu options may be passed
-        """
-        # Default options
-        options = {'durable': False,
-                   'queue_arguments': _get_queue_arguments(conf),
-                   'auto_delete': True,
-                   'exclusive': False}
-        options.update(kwargs)
-        exchange = kombu.entity.Exchange(name=msg_id,
-                                         type='direct',
-                                         durable=options['durable'],
-                                         auto_delete=options['auto_delete'])
-        super(DirectConsumer, self).__init__(channel,
-                                             callback,
-                                             tag,
-                                             name=msg_id,
-                                             exchange=exchange,
-                                             routing_key=msg_id,
-                                             **options)
-
-
-class TopicConsumer(ConsumerBase):
-    """Consumer class for 'topic'."""
-
-    def __init__(self, conf, channel, topic, callback, tag, exchange_name,
-                 name=None, **kwargs):
-        """Init a 'topic' queue.
-
-        :param channel: the amqp channel to use
-        :param topic: the topic to listen on
-        :paramtype topic: str
-        :param callback: the callback to call when messages are received
-        :param tag: a unique ID for the consumer on the channel
-        :param exchange_name: the exchange name to use
-        :param name: optional queue name, defaults to topic
-        :paramtype name: str
-
-        Other kombu options may be passed as keyword arguments
-        """
-        # Default options
-        options = {'durable': conf.amqp_durable_queues,
-                   'queue_arguments': _get_queue_arguments(conf),
-                   'auto_delete': conf.amqp_auto_delete,
-                   'exclusive': False}
-        options.update(kwargs)
-        exchange = kombu.entity.Exchange(name=exchange_name,
-                                         type='topic',
-                                         durable=options['durable'],
-                                         auto_delete=options['auto_delete'])
-        super(TopicConsumer, self).__init__(channel,
-                                            callback,
-                                            tag,
-                                            name=name or topic,
-                                            exchange=exchange,
-                                            routing_key=topic,
-                                            **options)
-
-
-class FanoutConsumer(ConsumerBase):
-    """Consumer class for 'fanout'."""
-
-    def __init__(self, conf, channel, topic, callback, tag, **kwargs):
-        """Init a 'fanout' queue.
-
-        'channel' is the amqp channel to use
-        'topic' is the topic to listen on
-        'callback' is the callback to call when messages are received
-        'tag' is a unique ID for the consumer on the channel
-
-        Other kombu options may be passed
-        """
-        unique = uuid.uuid4().hex
-        exchange_name = '%s_fanout' % topic
-        queue_name = '%s_fanout_%s' % (topic, unique)
-
-        # Default options
-        options = {'durable': False,
-                   'queue_arguments': _get_queue_arguments(conf),
-                   'auto_delete': True,
-                   'exclusive': False}
-        options.update(kwargs)
-        exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
-                                         durable=options['durable'],
-                                         auto_delete=options['auto_delete'])
-        super(FanoutConsumer, self).__init__(channel, callback, tag,
-                                             name=queue_name,
-                                             exchange=exchange,
-                                             routing_key=topic,
-                                             **options)
-
 
 class Publisher(object):
     """Publisher that silently creates exchange but no queues."""
@@ -371,7 +266,6 @@ class Publisher(object):
         self.durable = durable
         self.exchange = kombu.entity.Exchange(name=self.exchange_name,
                                               type=type,
-                                              exclusive=False,
                                               durable=durable,
                                               auto_delete=auto_delete,
                                               passive=self.passive)
@@ -573,7 +467,6 @@ class Connection(object):
 
     def __init__(self, conf, url, purpose):
         self.consumers = []
-        self.consumer_num = itertools.count(1)
         self.conf = conf
         self.driver_conf = self.conf.oslo_messaging_rabbit
         self.max_retries = self.driver_conf.rabbit_max_retries
@@ -812,9 +705,8 @@ class Connection(object):
             a new channel, we use it the reconfigure our consumers.
             """
             self._set_current_channel(new_channel)
-            self.consumer_num = itertools.count(1)
             for consumer in self.consumers:
-                consumer.reconnect(new_channel)
+                consumer.declare(new_channel)
 
             LOG.info(_LI('Reconnected to AMQP server on '
                          '%(hostname)s:%(port)d'),
@@ -889,7 +781,6 @@ class Connection(object):
                 self._set_current_channel(None)
                 self.ensure_connection()
         self.consumers = []
-        self.consumer_num = itertools.count(1)
 
     def _heartbeat_supported_and_enabled(self):
         if self.driver_conf.heartbeat_timeout_threshold <= 0:
@@ -956,19 +847,18 @@ class Connection(object):
                 timeout=self._heartbeat_wait_timeout)
         self._heartbeat_exit_event.clear()
 
-    def declare_consumer(self, consumer_cls, topic, callback):
+    def declare_consumer(self, consumer):
         """Create a Consumer using the class that was passed in and
         add it to our list of consumers
         """
 
         def _connect_error(exc):
-            log_info = {'topic': topic, 'err_str': exc}
+            log_info = {'topic': consumer.routing_key, 'err_str': exc}
             LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
                       "%(err_str)s"), log_info)
 
         def _declare_consumer():
-            consumer = consumer_cls(self.driver_conf, self.channel, topic,
-                                    callback, six.next(self.consumer_num))
+            consumer.declare(self.channel)
             self.consumers.append(consumer)
             return consumer
 
@@ -997,11 +887,8 @@ class Connection(object):
 
         def _consume():
             if self.do_consume:
-                queues_head = self.consumers[:-1]  # not fanout.
-                queues_tail = self.consumers[-1]  # fanout
-                for queue in queues_head:
-                    queue.consume(nowait=True)
-                queues_tail.consume(nowait=False)
+                for tag, consumer in enumerate(self.consumers):
+                    consumer.consume(tag=tag)
                 self.do_consume = False
 
             poll_timeout = (self._poll_timeout if timeout is None
@@ -1045,20 +932,50 @@ class Connection(object):
         In nova's use, this is generally a msg_id queue used for
         responses for call/multicall
         """
-        self.declare_consumer(DirectConsumer, topic, callback)
+
+        consumer = Consumer(self.driver_conf,
+                            exchange_name=topic,
+                            queue_name=topic,
+                            routing_key=topic,
+                            type='direct',
+                            durable=False,
+                            auto_delete=True,
+                            callback=callback)
+
+        self.declare_consumer(consumer)
 
     def declare_topic_consumer(self, exchange_name, topic, callback=None,
                                queue_name=None):
         """Create a 'topic' consumer."""
-        self.declare_consumer(functools.partial(TopicConsumer,
-                                                name=queue_name,
-                                                exchange_name=exchange_name,
-                                                ),
-                              topic, callback)
+        consumer = Consumer(self.driver_conf,
+                            exchange_name=exchange_name,
+                            queue_name=queue_name or topic,
+                            routing_key=topic,
+                            type='topic',
+                            durable=self.driver_conf.amqp_durable_queues,
+                            auto_delete=self.driver_conf.amqp_auto_delete,
+                            callback=callback)
+
+        self.declare_consumer(consumer)
 
     def declare_fanout_consumer(self, topic, callback):
         """Create a 'fanout' consumer."""
-        self.declare_consumer(FanoutConsumer, topic, callback)
+
+        unique = uuid.uuid4().hex
+        exchange_name = '%s_fanout' % topic
+        queue_name = '%s_fanout_%s' % (topic, unique)
+
+        consumer = Consumer(self.driver_conf,
+                            exchange_name=exchange_name,
+                            queue_name=queue_name,
+                            routing_key=topic,
+                            type='fanout',
+                            durable=False,
+                            auto_delete=True,
+                            callback=callback,
+                            nowait=False)
+
+        self.declare_consumer(consumer)
 
     def direct_send(self, msg_id, msg):
         """Send a 'direct' message."""