diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py index 874775ddf..d6ee03a17 100644 --- a/oslo/messaging/_drivers/amqp.py +++ b/oslo/messaging/_drivers/amqp.py @@ -164,13 +164,11 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) - def join_consumer_pool(self, callback, pool_name, topic, exchange_name, - ack_on_error=True): + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): self.connection.join_consumer_pool(callback, pool_name, topic, - exchange_name, - ack_on_error) + exchange_name) def consume_in_thread(self): self.connection.consume_in_thread() diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index 76ccc6113..baa040374 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -189,7 +189,6 @@ class ConsumerBase(object): except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: - # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) def get_receiver(self): @@ -729,7 +728,7 @@ class Connection(object): return consumer def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None, ack_on_error=True): + exchange_name=None): """Register as a member of a group of consumers for a given topic from the specified exchange. diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 9fdffc4ee..dccde7e2b 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -134,7 +134,6 @@ class ConsumerBase(object): self.tag = str(tag) self.kwargs = kwargs self.queue = None - self.ack_on_error = kwargs.get('ack_on_error', True) self.reconnect(channel) def reconnect(self, channel): @@ -147,32 +146,16 @@ class ConsumerBase(object): def _callback_handler(self, message, callback): """Call callback with deserialized message. - Messages that are processed without exception are ack'ed. - - If the message processing generates an exception, it will be - ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed. - Rejection is better than waiting for the message to timeout. - Rejected messages are immediately requeued. + Messages that are processed and ack'ed. """ - ack_msg = False try: msg = rpc_common.deserialize_msg(message.payload) callback(msg) - ack_msg = True except Exception: - if self.ack_on_error: - ack_msg = True - LOG.exception(_("Failed to process message" - " ... skipping it.")) - else: - LOG.exception(_("Failed to process message" - " ... will requeue.")) - finally: - if ack_msg: - message.ack() - else: - message.reject() + LOG.exception(_("Failed to process message" + " ... skipping it.")) + message.ack() def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will @@ -708,12 +691,11 @@ class Connection(object): self.declare_consumer(DirectConsumer, topic, callback) def declare_topic_consumer(self, topic, callback=None, queue_name=None, - exchange_name=None, ack_on_error=True): + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, exchange_name=exchange_name, - ack_on_error=ack_on_error, ), topic, callback) @@ -779,7 +761,7 @@ class Connection(object): self.declare_topic_consumer(topic, proxy_cb, pool_name) def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None, ack_on_error=True): + exchange_name=None): """Register as a member of a group of consumers for a given topic from the specified exchange. @@ -800,7 +782,6 @@ class Connection(object): topic=topic, exchange_name=exchange_name, callback=callback_wrapper, - ack_on_error=ack_on_error, )