From d09bf21897b202968cce0d03c1d39d1d7ecff149 Mon Sep 17 00:00:00 2001
From: Andrew Smith <ansmith@redhat.com>
Date: Tue, 9 Jan 2018 14:33:23 -0500
Subject: [PATCH] Add support for synchronous commit

This patch changes the default driver behavior to synchronously
commit messages following consumer poll. A configuration option
will enable the auto commit for asynchronous commit if desired.

Depends-On: I5b4f01c928373cac530aa6877a34c684577bc64e
Change-Id: I92a3dc95c5d424aa722138195fef5a855a66b31d
---
 oslo_messaging/_drivers/impl_kafka.py                | 12 ++++++++----
 .../_drivers/kafka_driver/kafka_options.py           |  9 ++++++++-
 oslo_messaging/tests/drivers/test_impl_kafka.py      |  2 ++
 3 files changed, 18 insertions(+), 5 deletions(-)

diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 676b1adba..ec3296b5c 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -153,6 +153,8 @@ class ConsumerConnection(Connection):
         self.consumer_timeout = driver_conf.kafka_consumer_timeout
         self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
         self.group_id = driver_conf.consumer_group
+        self.enable_auto_commit = driver_conf.enable_auto_commit
+        self.max_poll_records = driver_conf.max_poll_records
         self._consume_loop_stopped = False
 
     @with_reconnect()
@@ -165,6 +167,10 @@ class ConsumerConnection(Connection):
             # NOTE(sileht): really ? you return payload but no messages...
             # simulate timeout to consume message again
             raise kafka.errors.ConsumerTimeout()
+
+        if not self.enable_auto_commit:
+            self.consumer.commit()
+
         return messages
 
     def consume(self, timeout=None):
@@ -204,14 +210,12 @@ class ConsumerConnection(Connection):
 
     @with_reconnect()
     def declare_topic_consumer(self, topics, group=None):
-        # TODO(Support for manual/auto_commit functionality)
-        # When auto_commit is False, consumer can manually notify
-        # the completion of the subscription.
-        # Currently we don't support for non auto commit option
         self.consumer = kafka.KafkaConsumer(
             *topics, group_id=(group or self.group_id),
+            enable_auto_commit=self.enable_auto_commit,
             bootstrap_servers=self.hostaddrs,
             max_partition_fetch_bytes=self.max_fetch_bytes,
+            max_poll_records=self.max_poll_records,
             selector=KAFKA_SELECTOR
         )
 
diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
index 398f707c5..e435866d0 100644
--- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py
+++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
@@ -56,7 +56,14 @@ KAFKA_OPTS = [
                       "in seconds"),
 
     cfg.IntOpt('producer_batch_size', default=16384,
-               help='Size of batch for the producer async send')
+               help='Size of batch for the producer async send'),
+
+    cfg.BoolOpt('enable_auto_commit',
+                default=False,
+                help='Enable asynchronous consumer commits'),
+
+    cfg.IntOpt('max_poll_records', default=500,
+               help='The maximum number of records returned in a poll call')
 ]
 
 
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index f2f6f7f73..a34ce639b 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -115,8 +115,10 @@ class TestKafkaDriver(test_utils.BaseTestCase):
                 targets_and_priorities, "kafka_test", 1000, 10)
             consumer.assert_called_once_with(
                 *expected_topics, group_id="kafka_test",
+                enable_auto_commit=mock.ANY,
                 bootstrap_servers=['localhost:9092'],
                 max_partition_fetch_bytes=mock.ANY,
+                max_poll_records=mock.ANY,
                 selector=mock.ANY
             )