diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 5dfe3ab1e..1ae65af0d 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -13,8 +13,6 @@ # under the License. import contextlib -import functools -import itertools import logging import os import socket @@ -171,30 +169,42 @@ class RabbitMessage(dict): self._raw_message.requeue() -class ConsumerBase(object): - """Consumer base class.""" +class Consumer(object): + """Consumer class.""" - def __init__(self, channel, callback, tag, **kwargs): - """Declare a queue on an amqp channel. - - 'channel' is the amqp channel to use - 'callback' is the callback to call when messages are received - 'tag' is a unique ID for the consumer on the channel - - queue name, exchange name, and other kombu options are - passed in here as a dictionary. + def __init__(self, conf, exchange_name, queue_name, routing_key, type, + durable, auto_delete, callback, nowait=True): + """Init the Publisher class with the exchange_name, routing_key, + type, durable auto_delete """ + self.queue_name = queue_name + self.exchange_name = exchange_name + self.routing_key = routing_key + self.auto_delete = auto_delete + self.durable = durable self.callback = callback - self.tag = six.text_type(tag) - self.kwargs = kwargs - self.queue = None - self.reconnect(channel) + self.type = type + self.nowait = nowait + self.queue_arguments = _get_queue_arguments(conf) + + self.queue = None + self.exchange = kombu.entity.Exchange( + name=exchange_name, + type=type, + durable=self.durable, + auto_delete=self.auto_delete) + + def declare(self, channel): + """Re-declare the queue after a rabbit (re)connect.""" + self.queue = kombu.entity.Queue( + name=self.queue_name, + channel=channel, + exchange=self.exchange, + durable=self.durable, + auto_delete=self.auto_delete, + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) - def reconnect(self, channel): - """Re-declare the queue after a rabbit reconnect.""" - self.channel = channel - self.kwargs['channel'] = channel - self.queue = kombu.entity.Queue(**self.kwargs) try: self.queue.declare() except Exception as e: @@ -210,149 +220,34 @@ class ConsumerBase(object): LOG.error(_("Declaring queue failed with (%s), retrying"), e) self.queue.declare() - def _callback_handler(self, message, callback): + def consume(self, tag): + """Actually declare the consumer on the amqp channel. This will + start the flow of messages from the queue. Using the + Connection.consume() will process the messages, + calling the appropriate callback. + """ + + self.queue.consume(callback=self._callback, + consumer_tag=six.text_type(tag), + nowait=self.nowait) + + def _callback(self, message): """Call callback with deserialized message. Messages that are processed and ack'ed. """ + m2p = getattr(self.queue.channel, 'message_to_python', None) + if m2p: + message = m2p(message) + try: - callback(RabbitMessage(message)) + self.callback(RabbitMessage(message)) except Exception: LOG.exception(_("Failed to process message" " ... skipping it.")) message.ack() - def consume(self, *args, **kwargs): - """Actually declare the consumer on the amqp channel. This will - start the flow of messages from the queue. Using the - Connection.consume() will process the messages, - calling the appropriate callback. - - If a callback is specified in kwargs, use that. Otherwise, - use the callback passed during __init__() - - If kwargs['nowait'] is True, then this call will block until - a message is read. - - """ - - options = {'consumer_tag': self.tag} - options['nowait'] = kwargs.get('nowait', False) - callback = kwargs.get('callback', self.callback) - if not callback: - raise ValueError("No callback defined") - - def _callback(message): - m2p = getattr(self.channel, 'message_to_python', None) - if m2p: - message = m2p(message) - self._callback_handler(message, callback) - - self.queue.consume(*args, callback=_callback, **options) - - -class DirectConsumer(ConsumerBase): - """Queue/consumer class for 'direct'.""" - - def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): - """Init a 'direct' queue. - - 'channel' is the amqp channel to use - 'msg_id' is the msg_id to listen on - 'callback' is the callback to call when messages are received - 'tag' is a unique ID for the consumer on the channel - - Other kombu options may be passed - """ - # Default options - options = {'durable': False, - 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': True, - 'exclusive': False} - options.update(kwargs) - exchange = kombu.entity.Exchange(name=msg_id, - type='direct', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(DirectConsumer, self).__init__(channel, - callback, - tag, - name=msg_id, - exchange=exchange, - routing_key=msg_id, - **options) - - -class TopicConsumer(ConsumerBase): - """Consumer class for 'topic'.""" - - def __init__(self, conf, channel, topic, callback, tag, exchange_name, - name=None, **kwargs): - """Init a 'topic' queue. - - :param channel: the amqp channel to use - :param topic: the topic to listen on - :paramtype topic: str - :param callback: the callback to call when messages are received - :param tag: a unique ID for the consumer on the channel - :param exchange_name: the exchange name to use - :param name: optional queue name, defaults to topic - :paramtype name: str - - Other kombu options may be passed as keyword arguments - """ - # Default options - options = {'durable': conf.amqp_durable_queues, - 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': conf.amqp_auto_delete, - 'exclusive': False} - options.update(kwargs) - exchange = kombu.entity.Exchange(name=exchange_name, - type='topic', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(TopicConsumer, self).__init__(channel, - callback, - tag, - name=name or topic, - exchange=exchange, - routing_key=topic, - **options) - - -class FanoutConsumer(ConsumerBase): - """Consumer class for 'fanout'.""" - - def __init__(self, conf, channel, topic, callback, tag, **kwargs): - """Init a 'fanout' queue. - - 'channel' is the amqp channel to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received - 'tag' is a unique ID for the consumer on the channel - - Other kombu options may be passed - """ - unique = uuid.uuid4().hex - exchange_name = '%s_fanout' % topic - queue_name = '%s_fanout_%s' % (topic, unique) - - # Default options - options = {'durable': False, - 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': True, - 'exclusive': False} - options.update(kwargs) - exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(FanoutConsumer, self).__init__(channel, callback, tag, - name=queue_name, - exchange=exchange, - routing_key=topic, - **options) - class Publisher(object): """Publisher that silently creates exchange but no queues.""" @@ -371,7 +266,6 @@ class Publisher(object): self.durable = durable self.exchange = kombu.entity.Exchange(name=self.exchange_name, type=type, - exclusive=False, durable=durable, auto_delete=auto_delete, passive=self.passive) @@ -573,7 +467,6 @@ class Connection(object): def __init__(self, conf, url, purpose): self.consumers = [] - self.consumer_num = itertools.count(1) self.conf = conf self.driver_conf = self.conf.oslo_messaging_rabbit self.max_retries = self.driver_conf.rabbit_max_retries @@ -812,9 +705,8 @@ class Connection(object): a new channel, we use it the reconfigure our consumers. """ self._set_current_channel(new_channel) - self.consumer_num = itertools.count(1) for consumer in self.consumers: - consumer.reconnect(new_channel) + consumer.declare(new_channel) LOG.info(_LI('Reconnected to AMQP server on ' '%(hostname)s:%(port)d'), @@ -889,7 +781,6 @@ class Connection(object): self._set_current_channel(None) self.ensure_connection() self.consumers = [] - self.consumer_num = itertools.count(1) def _heartbeat_supported_and_enabled(self): if self.driver_conf.heartbeat_timeout_threshold <= 0: @@ -956,19 +847,18 @@ class Connection(object): timeout=self._heartbeat_wait_timeout) self._heartbeat_exit_event.clear() - def declare_consumer(self, consumer_cls, topic, callback): + def declare_consumer(self, consumer): """Create a Consumer using the class that was passed in and add it to our list of consumers """ def _connect_error(exc): - log_info = {'topic': topic, 'err_str': exc} + log_info = {'topic': consumer.routing_key, 'err_str': exc} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s"), log_info) def _declare_consumer(): - consumer = consumer_cls(self.driver_conf, self.channel, topic, - callback, six.next(self.consumer_num)) + consumer.declare(self.channel) self.consumers.append(consumer) return consumer @@ -997,11 +887,8 @@ class Connection(object): def _consume(): if self.do_consume: - queues_head = self.consumers[:-1] # not fanout. - queues_tail = self.consumers[-1] # fanout - for queue in queues_head: - queue.consume(nowait=True) - queues_tail.consume(nowait=False) + for tag, consumer in enumerate(self.consumers): + consumer.consume(tag=tag) self.do_consume = False poll_timeout = (self._poll_timeout if timeout is None @@ -1045,20 +932,50 @@ class Connection(object): In nova's use, this is generally a msg_id queue used for responses for call/multicall """ - self.declare_consumer(DirectConsumer, topic, callback) + + consumer = Consumer(self.driver_conf, + exchange_name=topic, + queue_name=topic, + routing_key=topic, + type='direct', + durable=False, + auto_delete=True, + callback=callback) + + self.declare_consumer(consumer) def declare_topic_consumer(self, exchange_name, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - self.declare_consumer(functools.partial(TopicConsumer, - name=queue_name, - exchange_name=exchange_name, - ), - topic, callback) + consumer = Consumer(self.driver_conf, + exchange_name=exchange_name, + queue_name=queue_name or topic, + routing_key=topic, + type='topic', + durable=self.driver_conf.amqp_durable_queues, + auto_delete=self.driver_conf.amqp_auto_delete, + callback=callback) + + self.declare_consumer(consumer) def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer.""" - self.declare_consumer(FanoutConsumer, topic, callback) + + unique = uuid.uuid4().hex + exchange_name = '%s_fanout' % topic + queue_name = '%s_fanout_%s' % (topic, unique) + + consumer = Consumer(self.driver_conf, + exchange_name=exchange_name, + queue_name=queue_name, + routing_key=topic, + type='fanout', + durable=False, + auto_delete=True, + callback=callback, + nowait=False) + + self.declare_consumer(consumer) def direct_send(self, msg_id, msg): """Send a 'direct' message."""