From fcd51a67d18a9e947ae5f57eafa43ac756d1a5a8 Mon Sep 17 00:00:00 2001
From: Nicolas Simonds <nic@metacloud.com>
Date: Wed, 26 Feb 2014 15:21:01 -0800
Subject: [PATCH] Slow down Kombu reconnect attempts

For a rationale for this patch, see the discussion surrounding Bug

When reconnecting to a RabbitMQ cluster with mirrored queues in
use, the attempt to release the connection can hang "indefinitely"
somewhere deep down in Kombu.  Blocking the thread for a bit
prior to release seems to kludge around the problem where it is
otherwise reproduceable.

DocImpact

Change-Id: Ic2ede3046709b831adf8204e4c909c589c1786c4
Partial-Bug: 856764
---
 oslo/messaging/_drivers/impl_rabbit.py | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index e49834ca9..a369489cd 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -53,6 +53,10 @@ rabbit_opts = [
                default='',
                help=('SSL certification authority file '
                      '(valid only if SSL enabled).')),
+    cfg.FloatOpt('kombu_reconnect_delay',
+                 default=1.0,
+                 help='How long to wait before reconnecting in response to an '
+                      'AMQP consumer cancel notification.'),
     cfg.StrOpt('rabbit_host',
                default='localhost',
                help='The RabbitMQ broker address where a single node is '
@@ -503,6 +507,17 @@ class Connection(object):
             LOG.info(_("Reconnecting to AMQP server on "
                      "%(hostname)s:%(port)d") % params)
             try:
+                # XXX(nic): when reconnecting to a RabbitMQ cluster
+                # with mirrored queues in use, the attempt to release the
+                # connection can hang "indefinitely" somewhere deep down
+                # in Kombu.  Blocking the thread for a bit prior to
+                # release seems to kludge around the problem where it is
+                # otherwise reproduceable.
+                if self.conf.kombu_reconnect_delay > 0:
+                    LOG.info(_("Delaying reconnect for %1.1f seconds...") %
+                             self.conf.kombu_reconnect_delay)
+                    time.sleep(self.conf.kombu_reconnect_delay)
+
                 self.connection.release()
             except self.connection_errors:
                 pass