From eb21f6b2634b976fe65837c52741a6c0c3ef1fe5 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk <mehdi.abaakouk@enovance.com> Date: Thu, 27 Nov 2014 14:46:52 +0100 Subject: [PATCH] Warn user if needed when the process is forked This change warns the library consumer when the process if forked and we can't be sure that the library work as expected. This also add some documentation about forking oslo.messaging Transport object. Change-Id: I2938421775aa72866adac198d70214856d45e165 Related-bug: #1330199 --- doc/source/transport.rst | 14 ++++++++++++++ oslo/messaging/_drivers/impl_qpid.py | 17 +++++++++++++++++ oslo/messaging/_drivers/impl_rabbit.py | 11 +++++++++++ 3 files changed, 42 insertions(+) diff --git a/doc/source/transport.rst b/doc/source/transport.rst index f914269d9..fc6a7cfb1 100644 --- a/doc/source/transport.rst +++ b/doc/source/transport.rst @@ -14,3 +14,17 @@ Transport .. autoclass:: TransportHost .. autofunction:: set_transport_defaults + + +About fork oslo.messaging transport object +------------------------------------------ + +oslo.messaging can't ensure that forking a process that shares the same +transport object is safe for the library consumer, because it relies on +different 3rd party libraries that don't ensure that too, but in certain +case/driver it works: + +* rabbit: works only if no connection have already been established. +* qpid: doesn't work (qpid library have a global state that use fd + that can't be resetted) +* amqp1: works diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index 28bde7ca9..25083c35c 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -16,6 +16,7 @@ import functools import itertools import logging +import os import random import time @@ -490,6 +491,7 @@ class Connection(object): random.shuffle(self.brokers_params) self.brokers = itertools.cycle(self.brokers_params) + self._initial_pid = os.getpid() self.reconnect() def _connect(self, broker): @@ -578,6 +580,21 @@ class Connection(object): LOG.debug("Re-established AMQP queues") def ensure(self, error_callback, method, retry=None): + + current_pid = os.getpid() + if self._initial_pid != current_pid: + # NOTE(sileht): + # to get the same level of fork support that rabbit driver have + # (ie: allow fork before the first connection established) + # we could use the kombu workaround: + # https://github.com/celery/kombu/blob/master/kombu/transport/ + # qpid_patches.py#L67 + LOG.warn("Process forked! " + "This can results to unpredictable behavior. " + "See: http://docs.openstack.org/developer/" + "oslo.messaging/transport.html") + self._initial_pid = current_pid + while True: try: return method() diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 939a3cec2..2fda9b40c 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -15,6 +15,7 @@ import functools import itertools import logging +import os import socket import ssl import time @@ -472,6 +473,8 @@ class Connection(object): hostname, port, virtual_host) + self._initial_pid = os.getpid() + self.do_consume = True self.channel = None @@ -553,6 +556,14 @@ class Connection(object): retry = N means N retries """ + current_pid = os.getpid() + if self._initial_pid != current_pid: + LOG.warn("Process forked after connection established! " + "This can results to unpredictable behavior. " + "See: http://docs.openstack.org/developer/" + "oslo.messaging/transport.html") + self._initial_pid = current_pid + if retry is None: retry = self.max_retries if retry is None or retry < 0: