From 0400cbf4f83cf8d58076c7e65e08a156ec3508a8 Mon Sep 17 00:00:00 2001
From: Chet Burgess <cfb@metacloud.com>
Date: Fri, 28 Feb 2014 13:39:09 -0800
Subject: [PATCH] Gracefully handle consumer cancel notifications

With mirrored queues and clustered rabbit nodes a queue is still
mastered by a single rabbit node. When the rabbit node dies an
election occurs amongst the remaining nodes and a new master is
elected. When a slave is promoted to master it will close all the
open channels to its consumers but it will not close the
connections. This is reported to consumers as a consumer cancel
notification (CCN). Consumers need to re-subscribe to these queues
when they recieve a CCN.

kombu 2.1.4+ reports CCNs as channel errors. This patch updates
the ensure function to be more inline with the upstream kombu
functionality. We now monitor for channel errors as well as
connection errors and initiate a reconnect if we detect an error.

Change-Id: Ie00f67e65250dc983fa45877c14091ad4ae136b4
Partial-Bug: 856764
---
 oslo/messaging/_drivers/impl_rabbit.py | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 0bc6a8bda..e49834ca9 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -511,6 +511,7 @@ class Connection(object):
             self.connection = None
         self.connection = kombu.connection.BrokerConnection(**params)
         self.connection_errors = self.connection.connection_errors
+        self.channel_errors = self.connection.channel_errors
         if self.memory_transport:
             # Kludge to speed up tests.
             self.connection.transport.polling_interval = 0.0
@@ -588,6 +589,9 @@ class Connection(object):
             except self.connection_errors as e:
                 if error_callback:
                     error_callback(e)
+            except self.channel_errors as e:
+                if error_callback:
+                    error_callback(e)
             except (socket.timeout, IOError) as e:
                 if error_callback:
                     error_callback(e)