Merge "Remove the partial implementation of ack_on_error"

This commit is contained in:
Jenkins 2013-12-06 17:34:06 +00:00 committed by Gerrit Code Review
commit 875acfc29b
3 changed files with 9 additions and 31 deletions

View File

@ -164,13 +164,11 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
ack_on_error=True):
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
exchange_name,
ack_on_error)
exchange_name)
def consume_in_thread(self):
self.connection.consume_in_thread()

View File

@ -189,7 +189,6 @@ class ConsumerBase(object):
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
# TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
@ -729,7 +728,7 @@ class Connection(object):
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None, ack_on_error=True):
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.

View File

@ -134,7 +134,6 @@ class ConsumerBase(object):
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel)
def reconnect(self, channel):
@ -147,32 +146,16 @@ class ConsumerBase(object):
def _callback_handler(self, message, callback):
"""Call callback with deserialized message.
Messages that are processed without exception are ack'ed.
If the message processing generates an exception, it will be
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
Rejection is better than waiting for the message to timeout.
Rejected messages are immediately requeued.
Messages that are processed and ack'ed.
"""
ack_msg = False
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
ack_msg = True
except Exception:
if self.ack_on_error:
ack_msg = True
LOG.exception(_("Failed to process message"
" ... skipping it."))
else:
LOG.exception(_("Failed to process message"
" ... will requeue."))
finally:
if ack_msg:
message.ack()
else:
message.reject()
LOG.exception(_("Failed to process message"
" ... skipping it."))
message.ack()
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
@ -708,12 +691,11 @@ class Connection(object):
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None, ack_on_error=True):
exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
ack_on_error=ack_on_error,
),
topic, callback)
@ -779,7 +761,7 @@ class Connection(object):
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None, ack_on_error=True):
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
@ -800,7 +782,6 @@ class Connection(object):
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
ack_on_error=ack_on_error,
)