From cd63a7023519b82a46b3f29f2691ff32c5085c9f Mon Sep 17 00:00:00 2001
From: Davanum Srinivas <davanum@gmail.com>
Date: Wed, 27 May 2015 09:32:15 -0400
Subject: [PATCH] Adding Publisher Acknowledgements/confirms

During the vancouver summit, Michael Klishin from
pivotal proposed we should use publisher confirms
as the default setting to reduce messages being
dropped on the floor.

https://www.rabbitmq.com/confirms.html
http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

This setting is a lightweight way of keeping track of which messages
have been processed by the broker and which would need re-publishing
in case of broker shutdown or network failure.

Change-Id: I25bc955df130dad4725f5281211d37fd73e7ea63
---
 oslo_messaging/_drivers/impl_rabbit.py           | 3 ++-
 oslo_messaging/tests/drivers/test_impl_rabbit.py | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 65acef9d1..9f627f077 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -437,7 +437,8 @@ class Connection(object):
             self._url, ssl=self._fetch_ssl_params(),
             login_method=self._login_method,
             failover_strategy="shuffle",
-            heartbeat=self.driver_conf.heartbeat_timeout_threshold)
+            heartbeat=self.driver_conf.heartbeat_timeout_threshold,
+            transport_options={'confirm_publish': True})
 
         LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
                  self.connection.info())
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index c8f58bdbe..5ad75786b 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -169,7 +169,8 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
 
         transport._driver._get_connection()
         connection_klass.assert_called_once_with(
-            'memory:///', ssl=self.expected, login_method='AMQPLAIN',
+            'memory:///', transport_options={'confirm_publish': True},
+            ssl=self.expected, login_method='AMQPLAIN',
             heartbeat=0, failover_strategy="shuffle")