Merge "kafka: remove no really implemented feature"

This commit is contained in:
Jenkins 2017-01-06 09:08:15 +00:00 committed by Gerrit Code Review
commit a6820de99c

@ -129,11 +129,6 @@ class Connection(object):
self.group_id = driver_conf.consumer_group
self.url = url
self._parse_url()
# TODO(Support for manual/auto_commit functionality)
# When auto_commit is False, consumer can manually notify
# the completion of the subscription.
# Currently we don't support for non auto commit option
self.auto_commit = True
self._consume_loop_stopped = False
def _parse_url(self):
@ -229,17 +224,6 @@ class Connection(object):
self.consumer.close()
self.consumer = None
def commit(self):
"""Commit is used by subscribers belonging to the same group.
After subscribing messages, commit is called to prevent
the other subscribers which belong to the same group
from re-subscribing the same messages.
Currently self.auto_commit option is always True,
so we don't need to call this function.
"""
self.consumer.commit()
def _close_producer(self):
with self.producer_lock:
if self.producer:
@ -260,6 +244,10 @@ class Connection(object):
@with_reconnect()
def declare_topic_consumer(self, topics, group=None):
# TODO(Support for manual/auto_commit functionality)
# When auto_commit is False, consumer can manually notify
# the completion of the subscription.
# Currently we don't support for non auto commit option
self.consumer = kafka.KafkaConsumer(
*topics, group_id=(group or self.group_id),
bootstrap_servers=self.hostaddrs,
@ -308,14 +296,6 @@ class KafkaListener(base.PollStyleListener):
def cleanup(self):
self.conn.close()
def commit(self):
# TODO(Support for manually/auto commit functionality)
# It's better to allow users to commit manually and support for
# self.auto_commit = False option. For now, this commit function
# is meaningless since user couldn't call this function and
# auto_commit option is always True.
self.conn.commit()
class KafkaDriver(base.BaseDriver):
"""Note: Current implementation of this driver is experimental.