Fix deadlock when logging from a tpool thread.
The object server runs certain IO-intensive methods outside the main pthread for performance. If one of those methods tries to log, this can cause a crash that eventually leads to an object server with hundreds or thousands of greenthreads, all deadlocked. The short version of the story is that logging.SysLogHandler has a mutex which Eventlet monkey-patches. However, the monkey-patched mutex sometimes breaks if used across different pthreads, and it breaks in such a way that it is still considered held. After that happens, any attempt to emit a log message blocks the calling greenthread forever. The fix is to use a mutex that works across different greenlets and across different pthreads. This patch introduces such a lock based on an anonymous pipe. Change-Id: I57decefaf5bbed57b97a62d0df8518b112917480 Closes-Bug: 1710328
This commit is contained in:
parent
bf09a06708
commit
6d160797fc
@ -50,9 +50,12 @@ import stat
|
||||
import datetime
|
||||
|
||||
import eventlet
|
||||
import eventlet.debug
|
||||
import eventlet.greenthread
|
||||
import eventlet.semaphore
|
||||
from eventlet import GreenPool, sleep, Timeout, tpool
|
||||
from eventlet.green import socket, threading
|
||||
from eventlet.hubs import trampoline
|
||||
import eventlet.queue
|
||||
import netifaces
|
||||
import codecs
|
||||
@ -1894,17 +1897,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None,
|
||||
if udp_host:
|
||||
udp_port = int(conf.get('log_udp_port',
|
||||
logging.handlers.SYSLOG_UDP_PORT))
|
||||
handler = SysLogHandler(address=(udp_host, udp_port),
|
||||
facility=facility)
|
||||
handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port),
|
||||
facility=facility)
|
||||
else:
|
||||
log_address = conf.get('log_address', '/dev/log')
|
||||
try:
|
||||
handler = SysLogHandler(address=log_address, facility=facility)
|
||||
handler = ThreadSafeSysLogHandler(address=log_address,
|
||||
facility=facility)
|
||||
except socket.error as e:
|
||||
# Either /dev/log isn't a UNIX socket or it does not exist at all
|
||||
if e.errno not in [errno.ENOTSOCK, errno.ENOENT]:
|
||||
raise
|
||||
handler = SysLogHandler(facility=facility)
|
||||
handler = ThreadSafeSysLogHandler(facility=facility)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
get_logger.handler4logger[logger] = handler
|
||||
@ -4304,3 +4308,123 @@ def replace_partition_in_path(path, part_power):
|
||||
path_components[-4] = "%d" % part
|
||||
|
||||
return os.sep.join(path_components)
|
||||
|
||||
|
||||
class PipeMutex(object):
|
||||
"""
|
||||
Mutex using a pipe. Works across both greenlets and real threads, even
|
||||
at the same time.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.rfd, self.wfd = os.pipe()
|
||||
|
||||
# You can't create a pipe in non-blocking mode; you must set it
|
||||
# later.
|
||||
rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL)
|
||||
fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
|
||||
os.write(self.wfd, b'-') # start unlocked
|
||||
|
||||
self.owner = None
|
||||
self.recursion_depth = 0
|
||||
|
||||
# Usually, it's an error to have multiple greenthreads all waiting
|
||||
# to read the same file descriptor. It's often a sign of inadequate
|
||||
# concurrency control; for example, if you have two greenthreads
|
||||
# trying to use the same memcache connection, they'll end up writing
|
||||
# interleaved garbage to the socket or stealing part of each others'
|
||||
# responses.
|
||||
#
|
||||
# In this case, we have multiple greenthreads waiting on the same
|
||||
# file descriptor by design. This lets greenthreads in real thread A
|
||||
# wait with greenthreads in real thread B for the same mutex.
|
||||
# Therefore, we must turn off eventlet's multiple-reader detection.
|
||||
#
|
||||
# It would be better to turn off multiple-reader detection for only
|
||||
# our calls to trampoline(), but eventlet does not support that.
|
||||
eventlet.debug.hub_prevent_multiple_readers(False)
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
"""
|
||||
Acquire the mutex.
|
||||
|
||||
If called with blocking=False, returns True if the mutex was
|
||||
acquired and False if it wasn't. Otherwise, blocks until the mutex
|
||||
is acquired and returns True.
|
||||
|
||||
This lock is recursive; the same greenthread may acquire it as many
|
||||
times as it wants to, though it must then release it that many times
|
||||
too.
|
||||
"""
|
||||
current_greenthread_id = id(eventlet.greenthread.getcurrent())
|
||||
if self.owner == current_greenthread_id:
|
||||
self.recursion_depth += 1
|
||||
return True
|
||||
|
||||
while True:
|
||||
try:
|
||||
# If there is a byte available, this will read it and remove
|
||||
# it from the pipe. If not, this will raise OSError with
|
||||
# errno=EAGAIN.
|
||||
os.read(self.rfd, 1)
|
||||
self.owner = current_greenthread_id
|
||||
return True
|
||||
except OSError as err:
|
||||
if err.errno != errno.EAGAIN:
|
||||
raise
|
||||
|
||||
if not blocking:
|
||||
return False
|
||||
|
||||
# Tell eventlet to suspend the current greenthread until
|
||||
# self.rfd becomes readable. This will happen when someone
|
||||
# else writes to self.wfd.
|
||||
trampoline(self.rfd, read=True)
|
||||
|
||||
def release(self):
|
||||
"""
|
||||
Release the mutex.
|
||||
"""
|
||||
current_greenthread_id = id(eventlet.greenthread.getcurrent())
|
||||
if self.owner != current_greenthread_id:
|
||||
raise RuntimeError("cannot release un-acquired lock")
|
||||
|
||||
if self.recursion_depth > 0:
|
||||
self.recursion_depth -= 1
|
||||
return
|
||||
|
||||
self.owner = None
|
||||
os.write(self.wfd, b'X')
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close the mutex. This releases its file descriptors.
|
||||
|
||||
You can't use a mutex after it's been closed.
|
||||
"""
|
||||
if self.wfd is not None:
|
||||
os.close(self.rfd)
|
||||
self.rfd = None
|
||||
os.close(self.wfd)
|
||||
self.wfd = None
|
||||
self.owner = None
|
||||
self.recursion_depth = 0
|
||||
|
||||
def __del__(self):
|
||||
# We need this so we don't leak file descriptors. Otherwise, if you
|
||||
# call get_logger() and don't explicitly dispose of it by calling
|
||||
# logger.logger.handlers[0].lock.close() [1], the pipe file
|
||||
# descriptors are leaked.
|
||||
#
|
||||
# This only really comes up in tests. Swift processes tend to call
|
||||
# get_logger() once and then hang on to it until they exit, but the
|
||||
# test suite calls get_logger() a lot.
|
||||
#
|
||||
# [1] and that's a completely ridiculous thing to expect callers to
|
||||
# do, so nobody does it and that's okay.
|
||||
self.close()
|
||||
|
||||
|
||||
class ThreadSafeSysLogHandler(SysLogHandler):
|
||||
def createLock(self):
|
||||
self.lock = PipeMutex()
|
||||
|
@ -21,7 +21,9 @@ import ctypes
|
||||
import contextlib
|
||||
import errno
|
||||
import eventlet
|
||||
import eventlet.debug
|
||||
import eventlet.event
|
||||
import eventlet.patcher
|
||||
import functools
|
||||
import grp
|
||||
import logging
|
||||
@ -1489,7 +1491,7 @@ class TestUtils(unittest.TestCase):
|
||||
'test1\ntest3\ntest4\ntest6\n')
|
||||
|
||||
def test_get_logger_sysloghandler_plumbing(self):
|
||||
orig_sysloghandler = utils.SysLogHandler
|
||||
orig_sysloghandler = utils.ThreadSafeSysLogHandler
|
||||
syslog_handler_args = []
|
||||
|
||||
def syslog_handler_catcher(*args, **kwargs):
|
||||
@ -1500,7 +1502,7 @@ class TestUtils(unittest.TestCase):
|
||||
syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3
|
||||
|
||||
try:
|
||||
utils.SysLogHandler = syslog_handler_catcher
|
||||
utils.ThreadSafeSysLogHandler = syslog_handler_catcher
|
||||
utils.get_logger({
|
||||
'log_facility': 'LOG_LOCAL3',
|
||||
}, 'server', log_route='server')
|
||||
@ -1550,7 +1552,7 @@ class TestUtils(unittest.TestCase):
|
||||
'facility': orig_sysloghandler.LOG_LOCAL0})],
|
||||
syslog_handler_args)
|
||||
finally:
|
||||
utils.SysLogHandler = orig_sysloghandler
|
||||
utils.ThreadSafeSysLogHandler = orig_sysloghandler
|
||||
|
||||
@reset_logger_state
|
||||
def test_clean_logger_exception(self):
|
||||
@ -6252,5 +6254,179 @@ name = %s
|
||||
reload_storage_policies()
|
||||
self.assertIsNotNone(POLICIES.get_by_name(self.policy_name))
|
||||
|
||||
|
||||
class TestPipeMutex(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mutex = utils.PipeMutex()
|
||||
|
||||
def tearDown(self):
|
||||
self.mutex.close()
|
||||
|
||||
def test_nonblocking(self):
|
||||
evt_lock1 = eventlet.event.Event()
|
||||
evt_lock2 = eventlet.event.Event()
|
||||
evt_unlock = eventlet.event.Event()
|
||||
|
||||
def get_the_lock():
|
||||
self.mutex.acquire()
|
||||
evt_lock1.send('got the lock')
|
||||
evt_lock2.wait()
|
||||
self.mutex.release()
|
||||
evt_unlock.send('released the lock')
|
||||
|
||||
eventlet.spawn(get_the_lock)
|
||||
evt_lock1.wait() # Now, the other greenthread has the lock.
|
||||
|
||||
self.assertFalse(self.mutex.acquire(blocking=False))
|
||||
evt_lock2.send('please release the lock')
|
||||
evt_unlock.wait() # The other greenthread has released the lock.
|
||||
self.assertTrue(self.mutex.acquire(blocking=False))
|
||||
|
||||
def test_recursive(self):
|
||||
self.assertTrue(self.mutex.acquire(blocking=False))
|
||||
self.assertTrue(self.mutex.acquire(blocking=False))
|
||||
|
||||
def try_acquire_lock():
|
||||
return self.mutex.acquire(blocking=False)
|
||||
|
||||
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
|
||||
self.mutex.release()
|
||||
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
|
||||
self.mutex.release()
|
||||
self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
|
||||
|
||||
def test_release_without_acquire(self):
|
||||
self.assertRaises(RuntimeError, self.mutex.release)
|
||||
|
||||
def test_too_many_releases(self):
|
||||
self.mutex.acquire()
|
||||
self.mutex.release()
|
||||
self.assertRaises(RuntimeError, self.mutex.release)
|
||||
|
||||
def test_wrong_releaser(self):
|
||||
self.mutex.acquire()
|
||||
self.assertRaises(RuntimeError,
|
||||
eventlet.spawn(self.mutex.release).wait)
|
||||
|
||||
def test_blocking(self):
|
||||
evt = eventlet.event.Event()
|
||||
|
||||
sequence = []
|
||||
|
||||
def coro1():
|
||||
eventlet.sleep(0) # let coro2 go
|
||||
|
||||
self.mutex.acquire()
|
||||
sequence.append('coro1 acquire')
|
||||
evt.send('go')
|
||||
self.mutex.release()
|
||||
sequence.append('coro1 release')
|
||||
|
||||
def coro2():
|
||||
evt.wait() # wait for coro1 to start us
|
||||
self.mutex.acquire()
|
||||
sequence.append('coro2 acquire')
|
||||
self.mutex.release()
|
||||
sequence.append('coro2 release')
|
||||
|
||||
c1 = eventlet.spawn(coro1)
|
||||
c2 = eventlet.spawn(coro2)
|
||||
|
||||
c1.wait()
|
||||
c2.wait()
|
||||
|
||||
self.assertEqual(sequence, [
|
||||
'coro1 acquire',
|
||||
'coro1 release',
|
||||
'coro2 acquire',
|
||||
'coro2 release'])
|
||||
|
||||
def test_blocking_tpool(self):
|
||||
# Note: this test's success isn't a guarantee that the mutex is
|
||||
# working. However, this test's failure means that the mutex is
|
||||
# definitely broken.
|
||||
sequence = []
|
||||
|
||||
def do_stuff():
|
||||
n = 10
|
||||
while n > 0:
|
||||
self.mutex.acquire()
|
||||
sequence.append("<")
|
||||
eventlet.sleep(0.0001)
|
||||
sequence.append(">")
|
||||
self.mutex.release()
|
||||
n -= 1
|
||||
|
||||
greenthread1 = eventlet.spawn(do_stuff)
|
||||
greenthread2 = eventlet.spawn(do_stuff)
|
||||
|
||||
real_thread1 = eventlet.patcher.original('threading').Thread(
|
||||
target=do_stuff)
|
||||
real_thread1.start()
|
||||
|
||||
real_thread2 = eventlet.patcher.original('threading').Thread(
|
||||
target=do_stuff)
|
||||
real_thread2.start()
|
||||
|
||||
greenthread1.wait()
|
||||
greenthread2.wait()
|
||||
real_thread1.join()
|
||||
real_thread2.join()
|
||||
|
||||
self.assertEqual(''.join(sequence), "<>" * 40)
|
||||
|
||||
def test_blocking_preserves_ownership(self):
|
||||
pthread1_event = eventlet.patcher.original('threading').Event()
|
||||
pthread2_event1 = eventlet.patcher.original('threading').Event()
|
||||
pthread2_event2 = eventlet.patcher.original('threading').Event()
|
||||
thread_id = []
|
||||
owner = []
|
||||
|
||||
def pthread1():
|
||||
thread_id.append(id(eventlet.greenthread.getcurrent()))
|
||||
self.mutex.acquire()
|
||||
owner.append(self.mutex.owner)
|
||||
pthread2_event1.set()
|
||||
|
||||
orig_os_write = utils.os.write
|
||||
|
||||
def patched_os_write(*a, **kw):
|
||||
try:
|
||||
return orig_os_write(*a, **kw)
|
||||
finally:
|
||||
pthread1_event.wait()
|
||||
|
||||
with mock.patch.object(utils.os, 'write', patched_os_write):
|
||||
self.mutex.release()
|
||||
pthread2_event2.set()
|
||||
|
||||
def pthread2():
|
||||
pthread2_event1.wait() # ensure pthread1 acquires lock first
|
||||
thread_id.append(id(eventlet.greenthread.getcurrent()))
|
||||
self.mutex.acquire()
|
||||
pthread1_event.set()
|
||||
pthread2_event2.wait()
|
||||
owner.append(self.mutex.owner)
|
||||
self.mutex.release()
|
||||
|
||||
real_thread1 = eventlet.patcher.original('threading').Thread(
|
||||
target=pthread1)
|
||||
real_thread1.start()
|
||||
|
||||
real_thread2 = eventlet.patcher.original('threading').Thread(
|
||||
target=pthread2)
|
||||
real_thread2.start()
|
||||
|
||||
real_thread1.join()
|
||||
real_thread2.join()
|
||||
self.assertEqual(thread_id, owner)
|
||||
self.assertIsNone(self.mutex.owner)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# PipeMutex turns this off when you instantiate one
|
||||
eventlet.debug.hub_prevent_multiple_readers(True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user