diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index d12430cdc..ca76aaae2 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -33,6 +33,7 @@ import kombu.messaging from oslo_config import cfg from oslo_log import log as logging from oslo_utils import eventletutils +from oslo_utils import importutils import six import six.moves from six.moves.urllib import parse @@ -46,6 +47,18 @@ from oslo_messaging._drivers import pool from oslo_messaging import _utils from oslo_messaging import exceptions +eventlet = importutils.try_import('eventlet') +if eventlet and eventletutils.is_monkey_patched("thread"): + # Here we initialize module with the native python threading module + # if it was already monkey patched by eventlet/greenlet. + stdlib_threading = eventlet.patcher.original('threading') +else: + # Manage the case where we run this driver in a non patched environment + # and where user even so configure the driver to run heartbeat through + # a python thread, if we don't do that when the heartbeat will start + # we will facing an issue by trying to override the threading module. + stdlib_threading = threading + # NOTE(sileht): don't exists in py2 socket module TCP_USER_TIMEOUT = 18 @@ -75,6 +88,15 @@ rabbit_opts = [ deprecated_name='kombu_ssl_ca_certs', help='SSL certification authority file ' '(valid only if SSL enabled).'), + cfg.BoolOpt('heartbeat_in_pthread', + default=False, + help="EXPERIMENTAL: Run the health check heartbeat thread" + "through a native python thread. By default if this" + "option isn't provided the health check heartbeat will" + "inherit the execution model from the parent process. By" + "example if the parent process have monkey patched the" + "stdlib by using eventlet/greenlet then the heartbeat" + "will be run through a green thread."), cfg.FloatOpt('kombu_reconnect_delay', default=1.0, deprecated_group='DEFAULT', @@ -442,6 +464,34 @@ class Connection(object): driver_conf.kombu_missing_consumer_retry_timeout self.kombu_failover_strategy = driver_conf.kombu_failover_strategy self.kombu_compression = driver_conf.kombu_compression + self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread + + if self.heartbeat_in_pthread: + # NOTE(hberaud): Experimental: threading module is in use to run + # the rabbitmq health check heartbeat. in some situation like + # with nova-api, nova need green threads to run the cells + # mechanismes in an async mode, so they used eventlet and + # greenlet to monkey patch the python stdlib and get green threads. + # The issue here is that nova-api run under the apache MPM prefork + # module and mod_wsgi. The apache prefork module doesn't support + # epoll and recent kernel features, and evenlet is built over epoll + # and libevent, so when we run the rabbitmq heartbeat we inherit + # from the execution model of the parent process (nova-api), and + # in this case we will run the heartbeat through a green thread. + # We want to allow users to choose between pthread and + # green threads if needed in some specific situations. + # This experimental feature allow user to use pthread in an env + # that doesn't support eventlet without forcing the parent process + # to stop to use eventlet if they need monkey patching for some + # specific reasons. + # If users want to use pthread we need to make sure that we + # will use the *native* threading module for + # initialize the heartbeat thread. + # Here we override globaly the previously imported + # threading module with the native python threading module + # if it was already monkey patched by eventlet/greenlet. + global threading + threading = stdlib_threading if self.ssl: self.ssl_version = driver_conf.ssl_version @@ -905,7 +955,7 @@ class Connection(object): def _heartbeat_start(self): if self._heartbeat_supported_and_enabled(): - self._heartbeat_exit_event = eventletutils.Event() + self._heartbeat_exit_event = threading.Event() self._heartbeat_thread = threading.Thread( target=self._heartbeat_thread_job, name="Rabbit-heartbeat") self._heartbeat_thread.daemon = True diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 5f302db30..ce6d11d7c 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -89,6 +89,11 @@ class TestHeartbeat(test_utils.BaseTestCase): info='A recoverable connection/channel error occurred, ' 'trying to reconnect: %s') + def test_run_heartbeat_in_pthread(self): + self.config(heartbeat_in_pthread=True, + group="oslo_messaging_rabbit") + self._do_test_heartbeat_sent() + class TestRabbitQos(test_utils.BaseTestCase): @@ -999,6 +1004,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): class ConnectionLockTestCase(test_utils.BaseTestCase): def _thread(self, lock, sleep, heartbeat=False): + def thread_task(): if heartbeat: with lock.for_heartbeat():