From 22f240b82fffbd62be8568a7d0d3369134596ace Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Herv=C3=A9=20Beraud?= <hberaud@redhat.com>
Date: Tue, 4 Jun 2019 17:45:59 +0200
Subject: [PATCH] Allow users run the rabbitmq heartbeat inside a standard
 pthread.

This is an experimental feature.

The proposed changes will fix related issues when we run
heartbeat under apache/httpd enviornment with the apache MPM `prefork`
[1] engine and mod_wsgi or uwsgi in a monkey patched environment.

Propose changes to allow user to choose to run the rabbitmq health check
heartbeat in a standard python thread.

Issue
=====

We facing an issue with the rabbitmq driver heartbeat
under apache MPM `prefork` module and mod_wsgi when nova_api monkey
patched the stdlib by using eventlet.

nova_api calling eventlet.monkey_patch() [2] when it runs under mod_wsgi.

This impacts the AMQP heartbeat thread,
which is meant to be a native thread. Instead of checking AMQP sockets
every 15s, It is now suspended and resumed by eventlet. However,
resuming greenthreads can take a very long time if mod_wsgi isn't
processing traffic regularly, which can cause rabbitmq to close the AMQP
connection.

Root Cause
==========

The oslo.messaging RabbitMQ driver and especially the heartbeat
suffer to inherit the execution model of the service which consume him.

In this scenario nova_api need green threads to manage cells and edge
features so nova_api monkey patch the stdlib to obtain async features,
and the oslo.messaging rabbitmq driver endure these changes.

I think the main issue here is that nova_api want async and use eventlet green
threads to obtain it.

Solution
========

We want to allow user to isolate the heartbeat execution model
from the parent process inherited execution model by passing the
`heartbeat_in_pthread` option through the driver config.

While we use MPM `prefork` we want to avoid to use libevent and epoll.

If the `heartbeat_in_pthread` option is given we want to force to use the
python stdlib threading module to run the
rabbitmq heartbeat to avoid issue related to a non "standard"
environment. I mean "standard" because async features isn't the default
config in mostly case, starting by apache which define `prefork` is the
default engine.

This is an experimental feature, we can help us to ensure to run heartbeat
through a classical python thread

Specifications
==============

- https://review.opendev.org/661314

[1] https://httpd.apache.org/docs/2.4/fr/mod/prefork.html
[2] https://github.com/openstack/nova/commit/3c5e2b0e9fac985294a949852bb8c83d4ed77e04

Change-Id: If8846599efc48fe18ecfb99c04e2c38f9a45b9ed
---
 oslo_messaging/_drivers/impl_rabbit.py        | 52 ++++++++++++++++++-
 .../tests/drivers/test_impl_rabbit.py         |  6 +++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 6158300df..b35fe466e 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -33,6 +33,7 @@ import kombu.messaging
 from oslo_config import cfg
 from oslo_log import log as logging
 from oslo_utils import eventletutils
+from oslo_utils import importutils
 import six
 import six.moves
 from six.moves.urllib import parse
@@ -46,6 +47,18 @@ from oslo_messaging._drivers import pool
 from oslo_messaging import _utils
 from oslo_messaging import exceptions
 
+eventlet = importutils.try_import('eventlet')
+if eventlet and eventletutils.is_monkey_patched("thread"):
+    # Here we initialize module with the native python threading module
+    # if it was already monkey patched by eventlet/greenlet.
+    stdlib_threading = eventlet.patcher.original('threading')
+else:
+    # Manage the case where we run this driver in a non patched environment
+    # and where user even so configure the driver to run heartbeat through
+    # a python thread, if we don't do that when the heartbeat will start
+    # we will facing an issue by trying to override the threading module.
+    stdlib_threading = threading
+
 # NOTE(sileht): don't exists in py2 socket module
 TCP_USER_TIMEOUT = 18
 
@@ -76,6 +89,15 @@ rabbit_opts = [
                deprecated_name='kombu_ssl_ca_certs',
                help='SSL certification authority file '
                     '(valid only if SSL enabled).'),
+    cfg.BoolOpt('heartbeat_in_pthread',
+                default=False,
+                help="EXPERIMENTAL: Run the health check heartbeat thread"
+                     "through a native python thread. By default if this"
+                     "option isn't provided the  health check heartbeat will"
+                     "inherit the execution model from the parent process. By"
+                     "example if the parent process have monkey patched the"
+                     "stdlib by using eventlet/greenlet then the heartbeat"
+                     "will be run through a green thread."),
     cfg.FloatOpt('kombu_reconnect_delay',
                  default=1.0,
                  deprecated_group='DEFAULT',
@@ -443,6 +465,34 @@ class Connection(object):
             driver_conf.kombu_missing_consumer_retry_timeout
         self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
         self.kombu_compression = driver_conf.kombu_compression
+        self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
+
+        if self.heartbeat_in_pthread:
+            # NOTE(hberaud): Experimental: threading module is in use to run
+            # the rabbitmq health check heartbeat. in some situation like
+            # with nova-api, nova need green threads to run the cells
+            # mechanismes in an async mode, so they used eventlet and
+            # greenlet to monkey patch the python stdlib and get green threads.
+            # The issue here is that nova-api run under the apache MPM prefork
+            # module and mod_wsgi. The apache prefork module doesn't support
+            # epoll and recent kernel features, and evenlet is built over epoll
+            # and libevent, so when we run the rabbitmq heartbeat we inherit
+            # from the execution model of the parent process (nova-api), and
+            # in this case we will run the heartbeat through a green thread.
+            # We want to allow users to choose between pthread and
+            # green threads if needed in some specific situations.
+            # This experimental feature allow user to use pthread in an env
+            # that doesn't support eventlet without forcing the parent process
+            # to stop to use eventlet if they need monkey patching for some
+            # specific reasons.
+            # If users want to use pthread we need to make sure that we
+            # will use the *native* threading module for
+            # initialize the heartbeat thread.
+            # Here we override globaly the previously imported
+            # threading module with the native python threading module
+            # if it was already monkey patched by eventlet/greenlet.
+            global threading
+            threading = stdlib_threading
 
         if self.ssl:
             self.ssl_version = driver_conf.ssl_version
@@ -896,7 +946,7 @@ class Connection(object):
 
     def _heartbeat_start(self):
         if self._heartbeat_supported_and_enabled():
-            self._heartbeat_exit_event = eventletutils.Event()
+            self._heartbeat_exit_event = threading.Event()
             self._heartbeat_thread = threading.Thread(
                 target=self._heartbeat_thread_job)
             self._heartbeat_thread.daemon = True
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index a18da1535..214658f49 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -89,6 +89,11 @@ class TestHeartbeat(test_utils.BaseTestCase):
             info='A recoverable connection/channel error occurred, '
             'trying to reconnect: %s')
 
+    def test_run_heartbeat_in_pthread(self):
+        self.config(heartbeat_in_pthread=True,
+                    group="oslo_messaging_rabbit")
+        self._do_test_heartbeat_sent()
+
 
 class TestRabbitQos(test_utils.BaseTestCase):
 
@@ -997,6 +1002,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
 
 class ConnectionLockTestCase(test_utils.BaseTestCase):
     def _thread(self, lock, sleep, heartbeat=False):
+
         def thread_task():
             if heartbeat:
                 with lock.for_heartbeat():