From 8ee5ae135a6ecb918f40619982e3dc7e38ed0bbf Mon Sep 17 00:00:00 2001
From: Mehdi Abaakouk <sileht@redhat.com>
Date: Thu, 1 Jun 2017 10:28:23 +0200
Subject: [PATCH] Fix rabbitmq driver with blocking executor

We recently move ack/requeue of messages in main/polling thread
of rabbitmq drivers. And break the blocking executor.

This one is not tested by any tests and now deprecated.

This change workaround the issue until we completely remove the
blocking executor.

Change-Id: Id479100f6ff364cf67a199e9b70f9f0c7bf7e1a9
Closes-bug: #1694728
---
 oslo_messaging/_drivers/amqpdriver.py | 20 +++++++++++++++++---
 oslo_messaging/server.py              | 12 ++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index c5613b07a..539e48b0b 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -56,6 +56,13 @@ class MessageOperationsHandler(object):
             target=self._process_in_background)
         self._shutdown_thread.daemon = True
 
+        # HACK(sileht): this is set by the server.Server temporary
+        # to not have to rewrite the entire internal API to pass
+        # executor everywhere to make Listener aware of the server
+        # executor. All this hack is only for the blocking executor.
+        # And it's deprecated so...
+        self._executor = None
+
     def stop(self):
         self._shutdown.set()
 
@@ -85,9 +92,16 @@ class MessageOperationsHandler(object):
 
     def do(self, task):
         "Put the task in the queue and waits until the task is completed."
-        event = threading.Event()
-        self._tasks.put((task, event))
-        event.wait()
+        if self._executor is None:
+            raise RuntimeError("Unexpected error, no executor is setuped")
+        elif self._executor == "blocking":
+            # NOTE(sileht): Blocking will hang forever if we waiting the
+            # polling thread
+            task()
+        else:
+            event = threading.Event()
+            self._tasks.put((task, event))
+            event.wait()
 
 
 class AMQPIncomingMessage(base.RpcIncomingMessage):
diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py
index d2e50ac46..c8e77a673 100644
--- a/oslo_messaging/server.py
+++ b/oslo_messaging/server.py
@@ -417,6 +417,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
         except driver_base.TransportDriverError as ex:
             raise ServerListenError(self.target, ex)
 
+        # HACK(sileht): We temporary pass the executor to the rabbit
+        # listener to fix a race with the deprecated blocking executor.
+        # We do this hack because this is need only for 'synchronous'
+        # executor like blocking. And this one is deprecated. Making
+        # driver working in an sync and an async way is complicated
+        # and blocking have 0% tests coverage.
+        if hasattr(self.listener, '_poll_style_listener'):
+            l = self.listener._poll_style_listener
+            if hasattr(l, "_message_operations_handler"):
+                l._message_operations_handler._executor = (
+                    self.executor_type)
+
         self.listener.start(self._on_incoming)
 
     @ordered(after='start')