rabbit: remove unused consumer interfaces
The consumer code is over engineered, it allows to override everything, but the override is always done with functools.partial. None of the child Class have the same signature, sometimes the constructor use the parameter name as the parent class but for a different purpose, that makes the code hard to read. It's was never clear which options is passed to the queue and the exchange at this end to kombu. This changes removes all of that stuffs, and only use the kombu terminology for consumer parameters. Alse we don't hardcode anymore the tag and the channel in the consumer class, to allow to change them without recreating a consumer object in the futur. Change-Id: Ie341f0c973adbda9a342cb836867345aa42652d1
This commit is contained in:
parent
2d81577fa1
commit
b737a92afd
@ -13,8 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
@ -171,30 +169,42 @@ class RabbitMessage(dict):
|
||||
self._raw_message.requeue()
|
||||
|
||||
|
||||
class ConsumerBase(object):
|
||||
"""Consumer base class."""
|
||||
class Consumer(object):
|
||||
"""Consumer class."""
|
||||
|
||||
def __init__(self, channel, callback, tag, **kwargs):
|
||||
"""Declare a queue on an amqp channel.
|
||||
|
||||
'channel' is the amqp channel to use
|
||||
'callback' is the callback to call when messages are received
|
||||
'tag' is a unique ID for the consumer on the channel
|
||||
|
||||
queue name, exchange name, and other kombu options are
|
||||
passed in here as a dictionary.
|
||||
def __init__(self, conf, exchange_name, queue_name, routing_key, type,
|
||||
durable, auto_delete, callback, nowait=True):
|
||||
"""Init the Publisher class with the exchange_name, routing_key,
|
||||
type, durable auto_delete
|
||||
"""
|
||||
self.queue_name = queue_name
|
||||
self.exchange_name = exchange_name
|
||||
self.routing_key = routing_key
|
||||
self.auto_delete = auto_delete
|
||||
self.durable = durable
|
||||
self.callback = callback
|
||||
self.tag = six.text_type(tag)
|
||||
self.kwargs = kwargs
|
||||
self.queue = None
|
||||
self.reconnect(channel)
|
||||
self.type = type
|
||||
self.nowait = nowait
|
||||
self.queue_arguments = _get_queue_arguments(conf)
|
||||
|
||||
self.queue = None
|
||||
self.exchange = kombu.entity.Exchange(
|
||||
name=exchange_name,
|
||||
type=type,
|
||||
durable=self.durable,
|
||||
auto_delete=self.auto_delete)
|
||||
|
||||
def declare(self, channel):
|
||||
"""Re-declare the queue after a rabbit (re)connect."""
|
||||
self.queue = kombu.entity.Queue(
|
||||
name=self.queue_name,
|
||||
channel=channel,
|
||||
exchange=self.exchange,
|
||||
durable=self.durable,
|
||||
auto_delete=self.auto_delete,
|
||||
routing_key=self.routing_key,
|
||||
queue_arguments=self.queue_arguments)
|
||||
|
||||
def reconnect(self, channel):
|
||||
"""Re-declare the queue after a rabbit reconnect."""
|
||||
self.channel = channel
|
||||
self.kwargs['channel'] = channel
|
||||
self.queue = kombu.entity.Queue(**self.kwargs)
|
||||
try:
|
||||
self.queue.declare()
|
||||
except Exception as e:
|
||||
@ -210,149 +220,34 @@ class ConsumerBase(object):
|
||||
LOG.error(_("Declaring queue failed with (%s), retrying"), e)
|
||||
self.queue.declare()
|
||||
|
||||
def _callback_handler(self, message, callback):
|
||||
def consume(self, tag):
|
||||
"""Actually declare the consumer on the amqp channel. This will
|
||||
start the flow of messages from the queue. Using the
|
||||
Connection.consume() will process the messages,
|
||||
calling the appropriate callback.
|
||||
"""
|
||||
|
||||
self.queue.consume(callback=self._callback,
|
||||
consumer_tag=six.text_type(tag),
|
||||
nowait=self.nowait)
|
||||
|
||||
def _callback(self, message):
|
||||
"""Call callback with deserialized message.
|
||||
|
||||
Messages that are processed and ack'ed.
|
||||
"""
|
||||
|
||||
m2p = getattr(self.queue.channel, 'message_to_python', None)
|
||||
if m2p:
|
||||
message = m2p(message)
|
||||
|
||||
try:
|
||||
callback(RabbitMessage(message))
|
||||
self.callback(RabbitMessage(message))
|
||||
except Exception:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... skipping it."))
|
||||
message.ack()
|
||||
|
||||
def consume(self, *args, **kwargs):
|
||||
"""Actually declare the consumer on the amqp channel. This will
|
||||
start the flow of messages from the queue. Using the
|
||||
Connection.consume() will process the messages,
|
||||
calling the appropriate callback.
|
||||
|
||||
If a callback is specified in kwargs, use that. Otherwise,
|
||||
use the callback passed during __init__()
|
||||
|
||||
If kwargs['nowait'] is True, then this call will block until
|
||||
a message is read.
|
||||
|
||||
"""
|
||||
|
||||
options = {'consumer_tag': self.tag}
|
||||
options['nowait'] = kwargs.get('nowait', False)
|
||||
callback = kwargs.get('callback', self.callback)
|
||||
if not callback:
|
||||
raise ValueError("No callback defined")
|
||||
|
||||
def _callback(message):
|
||||
m2p = getattr(self.channel, 'message_to_python', None)
|
||||
if m2p:
|
||||
message = m2p(message)
|
||||
self._callback_handler(message, callback)
|
||||
|
||||
self.queue.consume(*args, callback=_callback, **options)
|
||||
|
||||
|
||||
class DirectConsumer(ConsumerBase):
|
||||
"""Queue/consumer class for 'direct'."""
|
||||
|
||||
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
|
||||
"""Init a 'direct' queue.
|
||||
|
||||
'channel' is the amqp channel to use
|
||||
'msg_id' is the msg_id to listen on
|
||||
'callback' is the callback to call when messages are received
|
||||
'tag' is a unique ID for the consumer on the channel
|
||||
|
||||
Other kombu options may be passed
|
||||
"""
|
||||
# Default options
|
||||
options = {'durable': False,
|
||||
'queue_arguments': _get_queue_arguments(conf),
|
||||
'auto_delete': True,
|
||||
'exclusive': False}
|
||||
options.update(kwargs)
|
||||
exchange = kombu.entity.Exchange(name=msg_id,
|
||||
type='direct',
|
||||
durable=options['durable'],
|
||||
auto_delete=options['auto_delete'])
|
||||
super(DirectConsumer, self).__init__(channel,
|
||||
callback,
|
||||
tag,
|
||||
name=msg_id,
|
||||
exchange=exchange,
|
||||
routing_key=msg_id,
|
||||
**options)
|
||||
|
||||
|
||||
class TopicConsumer(ConsumerBase):
|
||||
"""Consumer class for 'topic'."""
|
||||
|
||||
def __init__(self, conf, channel, topic, callback, tag, exchange_name,
|
||||
name=None, **kwargs):
|
||||
"""Init a 'topic' queue.
|
||||
|
||||
:param channel: the amqp channel to use
|
||||
:param topic: the topic to listen on
|
||||
:paramtype topic: str
|
||||
:param callback: the callback to call when messages are received
|
||||
:param tag: a unique ID for the consumer on the channel
|
||||
:param exchange_name: the exchange name to use
|
||||
:param name: optional queue name, defaults to topic
|
||||
:paramtype name: str
|
||||
|
||||
Other kombu options may be passed as keyword arguments
|
||||
"""
|
||||
# Default options
|
||||
options = {'durable': conf.amqp_durable_queues,
|
||||
'queue_arguments': _get_queue_arguments(conf),
|
||||
'auto_delete': conf.amqp_auto_delete,
|
||||
'exclusive': False}
|
||||
options.update(kwargs)
|
||||
exchange = kombu.entity.Exchange(name=exchange_name,
|
||||
type='topic',
|
||||
durable=options['durable'],
|
||||
auto_delete=options['auto_delete'])
|
||||
super(TopicConsumer, self).__init__(channel,
|
||||
callback,
|
||||
tag,
|
||||
name=name or topic,
|
||||
exchange=exchange,
|
||||
routing_key=topic,
|
||||
**options)
|
||||
|
||||
|
||||
class FanoutConsumer(ConsumerBase):
|
||||
"""Consumer class for 'fanout'."""
|
||||
|
||||
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
|
||||
"""Init a 'fanout' queue.
|
||||
|
||||
'channel' is the amqp channel to use
|
||||
'topic' is the topic to listen on
|
||||
'callback' is the callback to call when messages are received
|
||||
'tag' is a unique ID for the consumer on the channel
|
||||
|
||||
Other kombu options may be passed
|
||||
"""
|
||||
unique = uuid.uuid4().hex
|
||||
exchange_name = '%s_fanout' % topic
|
||||
queue_name = '%s_fanout_%s' % (topic, unique)
|
||||
|
||||
# Default options
|
||||
options = {'durable': False,
|
||||
'queue_arguments': _get_queue_arguments(conf),
|
||||
'auto_delete': True,
|
||||
'exclusive': False}
|
||||
options.update(kwargs)
|
||||
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
|
||||
durable=options['durable'],
|
||||
auto_delete=options['auto_delete'])
|
||||
super(FanoutConsumer, self).__init__(channel, callback, tag,
|
||||
name=queue_name,
|
||||
exchange=exchange,
|
||||
routing_key=topic,
|
||||
**options)
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
"""Publisher that silently creates exchange but no queues."""
|
||||
@ -371,7 +266,6 @@ class Publisher(object):
|
||||
self.durable = durable
|
||||
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
|
||||
type=type,
|
||||
exclusive=False,
|
||||
durable=durable,
|
||||
auto_delete=auto_delete,
|
||||
passive=self.passive)
|
||||
@ -573,7 +467,6 @@ class Connection(object):
|
||||
|
||||
def __init__(self, conf, url, purpose):
|
||||
self.consumers = []
|
||||
self.consumer_num = itertools.count(1)
|
||||
self.conf = conf
|
||||
self.driver_conf = self.conf.oslo_messaging_rabbit
|
||||
self.max_retries = self.driver_conf.rabbit_max_retries
|
||||
@ -812,9 +705,8 @@ class Connection(object):
|
||||
a new channel, we use it the reconfigure our consumers.
|
||||
"""
|
||||
self._set_current_channel(new_channel)
|
||||
self.consumer_num = itertools.count(1)
|
||||
for consumer in self.consumers:
|
||||
consumer.reconnect(new_channel)
|
||||
consumer.declare(new_channel)
|
||||
|
||||
LOG.info(_LI('Reconnected to AMQP server on '
|
||||
'%(hostname)s:%(port)d'),
|
||||
@ -889,7 +781,6 @@ class Connection(object):
|
||||
self._set_current_channel(None)
|
||||
self.ensure_connection()
|
||||
self.consumers = []
|
||||
self.consumer_num = itertools.count(1)
|
||||
|
||||
def _heartbeat_supported_and_enabled(self):
|
||||
if self.driver_conf.heartbeat_timeout_threshold <= 0:
|
||||
@ -956,19 +847,18 @@ class Connection(object):
|
||||
timeout=self._heartbeat_wait_timeout)
|
||||
self._heartbeat_exit_event.clear()
|
||||
|
||||
def declare_consumer(self, consumer_cls, topic, callback):
|
||||
def declare_consumer(self, consumer):
|
||||
"""Create a Consumer using the class that was passed in and
|
||||
add it to our list of consumers
|
||||
"""
|
||||
|
||||
def _connect_error(exc):
|
||||
log_info = {'topic': topic, 'err_str': exc}
|
||||
log_info = {'topic': consumer.routing_key, 'err_str': exc}
|
||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||
"%(err_str)s"), log_info)
|
||||
|
||||
def _declare_consumer():
|
||||
consumer = consumer_cls(self.driver_conf, self.channel, topic,
|
||||
callback, six.next(self.consumer_num))
|
||||
consumer.declare(self.channel)
|
||||
self.consumers.append(consumer)
|
||||
return consumer
|
||||
|
||||
@ -997,11 +887,8 @@ class Connection(object):
|
||||
|
||||
def _consume():
|
||||
if self.do_consume:
|
||||
queues_head = self.consumers[:-1] # not fanout.
|
||||
queues_tail = self.consumers[-1] # fanout
|
||||
for queue in queues_head:
|
||||
queue.consume(nowait=True)
|
||||
queues_tail.consume(nowait=False)
|
||||
for tag, consumer in enumerate(self.consumers):
|
||||
consumer.consume(tag=tag)
|
||||
self.do_consume = False
|
||||
|
||||
poll_timeout = (self._poll_timeout if timeout is None
|
||||
@ -1045,20 +932,50 @@ class Connection(object):
|
||||
In nova's use, this is generally a msg_id queue used for
|
||||
responses for call/multicall
|
||||
"""
|
||||
self.declare_consumer(DirectConsumer, topic, callback)
|
||||
|
||||
consumer = Consumer(self.driver_conf,
|
||||
exchange_name=topic,
|
||||
queue_name=topic,
|
||||
routing_key=topic,
|
||||
type='direct',
|
||||
durable=False,
|
||||
auto_delete=True,
|
||||
callback=callback)
|
||||
|
||||
self.declare_consumer(consumer)
|
||||
|
||||
def declare_topic_consumer(self, exchange_name, topic, callback=None,
|
||||
queue_name=None):
|
||||
"""Create a 'topic' consumer."""
|
||||
self.declare_consumer(functools.partial(TopicConsumer,
|
||||
name=queue_name,
|
||||
exchange_name=exchange_name,
|
||||
),
|
||||
topic, callback)
|
||||
consumer = Consumer(self.driver_conf,
|
||||
exchange_name=exchange_name,
|
||||
queue_name=queue_name or topic,
|
||||
routing_key=topic,
|
||||
type='topic',
|
||||
durable=self.driver_conf.amqp_durable_queues,
|
||||
auto_delete=self.driver_conf.amqp_auto_delete,
|
||||
callback=callback)
|
||||
|
||||
self.declare_consumer(consumer)
|
||||
|
||||
def declare_fanout_consumer(self, topic, callback):
|
||||
"""Create a 'fanout' consumer."""
|
||||
self.declare_consumer(FanoutConsumer, topic, callback)
|
||||
|
||||
unique = uuid.uuid4().hex
|
||||
exchange_name = '%s_fanout' % topic
|
||||
queue_name = '%s_fanout_%s' % (topic, unique)
|
||||
|
||||
consumer = Consumer(self.driver_conf,
|
||||
exchange_name=exchange_name,
|
||||
queue_name=queue_name,
|
||||
routing_key=topic,
|
||||
type='fanout',
|
||||
durable=False,
|
||||
auto_delete=True,
|
||||
callback=callback,
|
||||
nowait=False)
|
||||
|
||||
self.declare_consumer(consumer)
|
||||
|
||||
def direct_send(self, msg_id, msg):
|
||||
"""Send a 'direct' message."""
|
||||
|
Loading…
x
Reference in New Issue
Block a user