Fix deadlock when logging from a tpool thread.
The object server runs certain IO-intensive methods outside the main pthread for performance. If one of those methods tries to log, this can cause a crash that eventually leads to an object server with hundreds or thousands of greenthreads, all deadlocked. The short version of the story is that logging.SysLogHandler has a mutex which Eventlet monkey-patches. However, the monkey-patched mutex sometimes breaks if used across different pthreads, and it breaks in such a way that it is still considered held. After that happens, any attempt to emit a log message blocks the calling greenthread forever. The fix is to use a mutex that works across different greenlets and across different pthreads. This patch introduces such a lock based on an anonymous pipe. Change-Id: I57decefaf5bbed57b97a62d0df8518b112917480 Closes-Bug: 1710328
This commit is contained in:
parent
bf09a06708
commit
6d160797fc
@ -50,9 +50,12 @@ import stat
|
|||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
import eventlet.debug
|
||||||
|
import eventlet.greenthread
|
||||||
import eventlet.semaphore
|
import eventlet.semaphore
|
||||||
from eventlet import GreenPool, sleep, Timeout, tpool
|
from eventlet import GreenPool, sleep, Timeout, tpool
|
||||||
from eventlet.green import socket, threading
|
from eventlet.green import socket, threading
|
||||||
|
from eventlet.hubs import trampoline
|
||||||
import eventlet.queue
|
import eventlet.queue
|
||||||
import netifaces
|
import netifaces
|
||||||
import codecs
|
import codecs
|
||||||
@ -1894,17 +1897,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None,
|
|||||||
if udp_host:
|
if udp_host:
|
||||||
udp_port = int(conf.get('log_udp_port',
|
udp_port = int(conf.get('log_udp_port',
|
||||||
logging.handlers.SYSLOG_UDP_PORT))
|
logging.handlers.SYSLOG_UDP_PORT))
|
||||||
handler = SysLogHandler(address=(udp_host, udp_port),
|
handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port),
|
||||||
facility=facility)
|
facility=facility)
|
||||||
else:
|
else:
|
||||||
log_address = conf.get('log_address', '/dev/log')
|
log_address = conf.get('log_address', '/dev/log')
|
||||||
try:
|
try:
|
||||||
handler = SysLogHandler(address=log_address, facility=facility)
|
handler = ThreadSafeSysLogHandler(address=log_address,
|
||||||
|
facility=facility)
|
||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
# Either /dev/log isn't a UNIX socket or it does not exist at all
|
# Either /dev/log isn't a UNIX socket or it does not exist at all
|
||||||
if e.errno not in [errno.ENOTSOCK, errno.ENOENT]:
|
if e.errno not in [errno.ENOTSOCK, errno.ENOENT]:
|
||||||
raise
|
raise
|
||||||
handler = SysLogHandler(facility=facility)
|
handler = ThreadSafeSysLogHandler(facility=facility)
|
||||||
handler.setFormatter(formatter)
|
handler.setFormatter(formatter)
|
||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
get_logger.handler4logger[logger] = handler
|
get_logger.handler4logger[logger] = handler
|
||||||
@ -4304,3 +4308,123 @@ def replace_partition_in_path(path, part_power):
|
|||||||
path_components[-4] = "%d" % part
|
path_components[-4] = "%d" % part
|
||||||
|
|
||||||
return os.sep.join(path_components)
|
return os.sep.join(path_components)
|
||||||
|
|
||||||
|
|
||||||
|
class PipeMutex(object):
|
||||||
|
"""
|
||||||
|
Mutex using a pipe. Works across both greenlets and real threads, even
|
||||||
|
at the same time.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.rfd, self.wfd = os.pipe()
|
||||||
|
|
||||||
|
# You can't create a pipe in non-blocking mode; you must set it
|
||||||
|
# later.
|
||||||
|
rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL)
|
||||||
|
fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
|
||||||
|
os.write(self.wfd, b'-') # start unlocked
|
||||||
|
|
||||||
|
self.owner = None
|
||||||
|
self.recursion_depth = 0
|
||||||
|
|
||||||
|
# Usually, it's an error to have multiple greenthreads all waiting
|
||||||
|
# to read the same file descriptor. It's often a sign of inadequate
|
||||||
|
# concurrency control; for example, if you have two greenthreads
|
||||||
|
# trying to use the same memcache connection, they'll end up writing
|
||||||
|
# interleaved garbage to the socket or stealing part of each others'
|
||||||
|
# responses.
|
||||||
|
#
|
||||||
|
# In this case, we have multiple greenthreads waiting on the same
|
||||||
|
# file descriptor by design. This lets greenthreads in real thread A
|
||||||
|
# wait with greenthreads in real thread B for the same mutex.
|
||||||
|
# Therefore, we must turn off eventlet's multiple-reader detection.
|
||||||
|
#
|
||||||
|
# It would be better to turn off multiple-reader detection for only
|
||||||
|
# our calls to trampoline(), but eventlet does not support that.
|
||||||
|
eventlet.debug.hub_prevent_multiple_readers(False)
|
||||||
|
|
||||||
|
def acquire(self, blocking=True):
|
||||||
|
"""
|
||||||
|
Acquire the mutex.
|
||||||
|
|
||||||
|
If called with blocking=False, returns True if the mutex was
|
||||||
|
acquired and False if it wasn't. Otherwise, blocks until the mutex
|
||||||
|
is acquired and returns True.
|
||||||
|
|
||||||
|
This lock is recursive; the same greenthread may acquire it as many
|
||||||
|
times as it wants to, though it must then release it that many times
|
||||||
|
too.
|
||||||
|
"""
|
||||||
|
current_greenthread_id = id(eventlet.greenthread.getcurrent())
|
||||||
|
if self.owner == current_greenthread_id:
|
||||||
|
self.recursion_depth += 1
|
||||||
|
return True
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# If there is a byte available, this will read it and remove
|
||||||
|
# it from the pipe. If not, this will raise OSError with
|
||||||
|
# errno=EAGAIN.
|
||||||
|
os.read(self.rfd, 1)
|
||||||
|
self.owner = current_greenthread_id
|
||||||
|
return True
|
||||||
|
except OSError as err:
|
||||||
|
if err.errno != errno.EAGAIN:
|
||||||
|
raise
|
||||||
|
|
||||||
|
if not blocking:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Tell eventlet to suspend the current greenthread until
|
||||||
|
# self.rfd becomes readable. This will happen when someone
|
||||||
|
# else writes to self.wfd.
|
||||||
|
trampoline(self.rfd, read=True)
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
"""
|
||||||
|
Release the mutex.
|
||||||
|
"""
|
||||||
|
current_greenthread_id = id(eventlet.greenthread.getcurrent())
|
||||||
|
if self.owner != current_greenthread_id:
|
||||||
|
raise RuntimeError("cannot release un-acquired lock")
|
||||||
|
|
||||||
|
if self.recursion_depth > 0:
|
||||||
|
self.recursion_depth -= 1
|
||||||
|
return
|
||||||
|
|
||||||
|
self.owner = None
|
||||||
|
os.write(self.wfd, b'X')
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""
|
||||||
|
Close the mutex. This releases its file descriptors.
|
||||||
|
|
||||||
|
You can't use a mutex after it's been closed.
|
||||||
|
"""
|
||||||
|
if self.wfd is not None:
|
||||||
|
os.close(self.rfd)
|
||||||
|
self.rfd = None
|
||||||
|
os.close(self.wfd)
|
||||||
|
self.wfd = None
|
||||||
|
self.owner = None
|
||||||
|
self.recursion_depth = 0
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
# We need this so we don't leak file descriptors. Otherwise, if you
|
||||||
|
# call get_logger() and don't explicitly dispose of it by calling
|
||||||
|
# logger.logger.handlers[0].lock.close() [1], the pipe file
|
||||||
|
# descriptors are leaked.
|
||||||
|
#
|
||||||
|
# This only really comes up in tests. Swift processes tend to call
|
||||||
|
# get_logger() once and then hang on to it until they exit, but the
|
||||||
|
# test suite calls get_logger() a lot.
|
||||||
|
#
|
||||||
|
# [1] and that's a completely ridiculous thing to expect callers to
|
||||||
|
# do, so nobody does it and that's okay.
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadSafeSysLogHandler(SysLogHandler):
|
||||||
|
def createLock(self):
|
||||||
|
self.lock = PipeMutex()
|
||||||
|
@ -21,7 +21,9 @@ import ctypes
|
|||||||
import contextlib
|
import contextlib
|
||||||
import errno
|
import errno
|
||||||
import eventlet
|
import eventlet
|
||||||
|
import eventlet.debug
|
||||||
import eventlet.event
|
import eventlet.event
|
||||||
|
import eventlet.patcher
|
||||||
import functools
|
import functools
|
||||||
import grp
|
import grp
|
||||||
import logging
|
import logging
|
||||||
@ -1489,7 +1491,7 @@ class TestUtils(unittest.TestCase):
|
|||||||
'test1\ntest3\ntest4\ntest6\n')
|
'test1\ntest3\ntest4\ntest6\n')
|
||||||
|
|
||||||
def test_get_logger_sysloghandler_plumbing(self):
|
def test_get_logger_sysloghandler_plumbing(self):
|
||||||
orig_sysloghandler = utils.SysLogHandler
|
orig_sysloghandler = utils.ThreadSafeSysLogHandler
|
||||||
syslog_handler_args = []
|
syslog_handler_args = []
|
||||||
|
|
||||||
def syslog_handler_catcher(*args, **kwargs):
|
def syslog_handler_catcher(*args, **kwargs):
|
||||||
@ -1500,7 +1502,7 @@ class TestUtils(unittest.TestCase):
|
|||||||
syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3
|
syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3
|
||||||
|
|
||||||
try:
|
try:
|
||||||
utils.SysLogHandler = syslog_handler_catcher
|
utils.ThreadSafeSysLogHandler = syslog_handler_catcher
|
||||||
utils.get_logger({
|
utils.get_logger({
|
||||||
'log_facility': 'LOG_LOCAL3',
|
'log_facility': 'LOG_LOCAL3',
|
||||||
}, 'server', log_route='server')
|
}, 'server', log_route='server')
|
||||||
@ -1550,7 +1552,7 @@ class TestUtils(unittest.TestCase):
|
|||||||
'facility': orig_sysloghandler.LOG_LOCAL0})],
|
'facility': orig_sysloghandler.LOG_LOCAL0})],
|
||||||
syslog_handler_args)
|
syslog_handler_args)
|
||||||
finally:
|
finally:
|
||||||
utils.SysLogHandler = orig_sysloghandler
|
utils.ThreadSafeSysLogHandler = orig_sysloghandler
|
||||||
|
|
||||||
@reset_logger_state
|
@reset_logger_state
|
||||||
def test_clean_logger_exception(self):
|
def test_clean_logger_exception(self):
|
||||||
@ -6252,5 +6254,179 @@ name = %s
|
|||||||
reload_storage_policies()
|
reload_storage_policies()
|
||||||
self.assertIsNotNone(POLICIES.get_by_name(self.policy_name))
|
self.assertIsNotNone(POLICIES.get_by_name(self.policy_name))
|
||||||
|
|
||||||
|
|
||||||
|
class TestPipeMutex(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mutex = utils.PipeMutex()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.mutex.close()
|
||||||
|
|
||||||
|
def test_nonblocking(self):
|
||||||
|
evt_lock1 = eventlet.event.Event()
|
||||||
|
evt_lock2 = eventlet.event.Event()
|
||||||
|
evt_unlock = eventlet.event.Event()
|
||||||
|
|
||||||
|
def get_the_lock():
|
||||||
|
self.mutex.acquire()
|
||||||
|
evt_lock1.send('got the lock')
|
||||||
|
evt_lock2.wait()
|
||||||
|
self.mutex.release()
|
||||||
|
evt_unlock.send('released the lock')
|
||||||
|
|
||||||
|
eventlet.spawn(get_the_lock)
|
||||||
|
evt_lock1.wait() # Now, the other greenthread has the lock.
|
||||||
|
|
||||||
|
self.assertFalse(self.mutex.acquire(blocking=False))
|
||||||
|
evt_lock2.send('please release the lock')
|
||||||
|
evt_unlock.wait() # The other greenthread has released the lock.
|
||||||
|
self.assertTrue(self.mutex.acquire(blocking=False))
|
||||||
|
|
||||||
|
def test_recursive(self):
|
||||||
|
self.assertTrue(self.mutex.acquire(blocking=False))
|
||||||
|
self.assertTrue(self.mutex.acquire(blocking=False))
|
||||||
|
|
||||||
|
def try_acquire_lock():
|
||||||
|
return self.mutex.acquire(blocking=False)
|
||||||
|
|
||||||
|
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
|
||||||
|
self.mutex.release()
|
||||||
|
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
|
||||||
|
self.mutex.release()
|
||||||
|
self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
|
||||||
|
|
||||||
|
def test_release_without_acquire(self):
|
||||||
|
self.assertRaises(RuntimeError, self.mutex.release)
|
||||||
|
|
||||||
|
def test_too_many_releases(self):
|
||||||
|
self.mutex.acquire()
|
||||||
|
self.mutex.release()
|
||||||
|
self.assertRaises(RuntimeError, self.mutex.release)
|
||||||
|
|
||||||
|
def test_wrong_releaser(self):
|
||||||
|
self.mutex.acquire()
|
||||||
|
self.assertRaises(RuntimeError,
|
||||||
|
eventlet.spawn(self.mutex.release).wait)
|
||||||
|
|
||||||
|
def test_blocking(self):
|
||||||
|
evt = eventlet.event.Event()
|
||||||
|
|
||||||
|
sequence = []
|
||||||
|
|
||||||
|
def coro1():
|
||||||
|
eventlet.sleep(0) # let coro2 go
|
||||||
|
|
||||||
|
self.mutex.acquire()
|
||||||
|
sequence.append('coro1 acquire')
|
||||||
|
evt.send('go')
|
||||||
|
self.mutex.release()
|
||||||
|
sequence.append('coro1 release')
|
||||||
|
|
||||||
|
def coro2():
|
||||||
|
evt.wait() # wait for coro1 to start us
|
||||||
|
self.mutex.acquire()
|
||||||
|
sequence.append('coro2 acquire')
|
||||||
|
self.mutex.release()
|
||||||
|
sequence.append('coro2 release')
|
||||||
|
|
||||||
|
c1 = eventlet.spawn(coro1)
|
||||||
|
c2 = eventlet.spawn(coro2)
|
||||||
|
|
||||||
|
c1.wait()
|
||||||
|
c2.wait()
|
||||||
|
|
||||||
|
self.assertEqual(sequence, [
|
||||||
|
'coro1 acquire',
|
||||||
|
'coro1 release',
|
||||||
|
'coro2 acquire',
|
||||||
|
'coro2 release'])
|
||||||
|
|
||||||
|
def test_blocking_tpool(self):
|
||||||
|
# Note: this test's success isn't a guarantee that the mutex is
|
||||||
|
# working. However, this test's failure means that the mutex is
|
||||||
|
# definitely broken.
|
||||||
|
sequence = []
|
||||||
|
|
||||||
|
def do_stuff():
|
||||||
|
n = 10
|
||||||
|
while n > 0:
|
||||||
|
self.mutex.acquire()
|
||||||
|
sequence.append("<")
|
||||||
|
eventlet.sleep(0.0001)
|
||||||
|
sequence.append(">")
|
||||||
|
self.mutex.release()
|
||||||
|
n -= 1
|
||||||
|
|
||||||
|
greenthread1 = eventlet.spawn(do_stuff)
|
||||||
|
greenthread2 = eventlet.spawn(do_stuff)
|
||||||
|
|
||||||
|
real_thread1 = eventlet.patcher.original('threading').Thread(
|
||||||
|
target=do_stuff)
|
||||||
|
real_thread1.start()
|
||||||
|
|
||||||
|
real_thread2 = eventlet.patcher.original('threading').Thread(
|
||||||
|
target=do_stuff)
|
||||||
|
real_thread2.start()
|
||||||
|
|
||||||
|
greenthread1.wait()
|
||||||
|
greenthread2.wait()
|
||||||
|
real_thread1.join()
|
||||||
|
real_thread2.join()
|
||||||
|
|
||||||
|
self.assertEqual(''.join(sequence), "<>" * 40)
|
||||||
|
|
||||||
|
def test_blocking_preserves_ownership(self):
|
||||||
|
pthread1_event = eventlet.patcher.original('threading').Event()
|
||||||
|
pthread2_event1 = eventlet.patcher.original('threading').Event()
|
||||||
|
pthread2_event2 = eventlet.patcher.original('threading').Event()
|
||||||
|
thread_id = []
|
||||||
|
owner = []
|
||||||
|
|
||||||
|
def pthread1():
|
||||||
|
thread_id.append(id(eventlet.greenthread.getcurrent()))
|
||||||
|
self.mutex.acquire()
|
||||||
|
owner.append(self.mutex.owner)
|
||||||
|
pthread2_event1.set()
|
||||||
|
|
||||||
|
orig_os_write = utils.os.write
|
||||||
|
|
||||||
|
def patched_os_write(*a, **kw):
|
||||||
|
try:
|
||||||
|
return orig_os_write(*a, **kw)
|
||||||
|
finally:
|
||||||
|
pthread1_event.wait()
|
||||||
|
|
||||||
|
with mock.patch.object(utils.os, 'write', patched_os_write):
|
||||||
|
self.mutex.release()
|
||||||
|
pthread2_event2.set()
|
||||||
|
|
||||||
|
def pthread2():
|
||||||
|
pthread2_event1.wait() # ensure pthread1 acquires lock first
|
||||||
|
thread_id.append(id(eventlet.greenthread.getcurrent()))
|
||||||
|
self.mutex.acquire()
|
||||||
|
pthread1_event.set()
|
||||||
|
pthread2_event2.wait()
|
||||||
|
owner.append(self.mutex.owner)
|
||||||
|
self.mutex.release()
|
||||||
|
|
||||||
|
real_thread1 = eventlet.patcher.original('threading').Thread(
|
||||||
|
target=pthread1)
|
||||||
|
real_thread1.start()
|
||||||
|
|
||||||
|
real_thread2 = eventlet.patcher.original('threading').Thread(
|
||||||
|
target=pthread2)
|
||||||
|
real_thread2.start()
|
||||||
|
|
||||||
|
real_thread1.join()
|
||||||
|
real_thread2.join()
|
||||||
|
self.assertEqual(thread_id, owner)
|
||||||
|
self.assertIsNone(self.mutex.owner)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
# PipeMutex turns this off when you instantiate one
|
||||||
|
eventlet.debug.hub_prevent_multiple_readers(True)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user