From 6d160797fc3257942618a7914d526911ebbda328 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Mon, 14 Aug 2017 10:41:31 -0700 Subject: [PATCH] Fix deadlock when logging from a tpool thread. The object server runs certain IO-intensive methods outside the main pthread for performance. If one of those methods tries to log, this can cause a crash that eventually leads to an object server with hundreds or thousands of greenthreads, all deadlocked. The short version of the story is that logging.SysLogHandler has a mutex which Eventlet monkey-patches. However, the monkey-patched mutex sometimes breaks if used across different pthreads, and it breaks in such a way that it is still considered held. After that happens, any attempt to emit a log message blocks the calling greenthread forever. The fix is to use a mutex that works across different greenlets and across different pthreads. This patch introduces such a lock based on an anonymous pipe. Change-Id: I57decefaf5bbed57b97a62d0df8518b112917480 Closes-Bug: 1710328 --- swift/common/utils.py | 132 +++++++++++++++++++++++- test/unit/common/test_utils.py | 182 ++++++++++++++++++++++++++++++++- 2 files changed, 307 insertions(+), 7 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index d7a79c8bb8..6723b99aac 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -50,9 +50,12 @@ import stat import datetime import eventlet +import eventlet.debug +import eventlet.greenthread import eventlet.semaphore from eventlet import GreenPool, sleep, Timeout, tpool from eventlet.green import socket, threading +from eventlet.hubs import trampoline import eventlet.queue import netifaces import codecs @@ -1894,17 +1897,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None, if udp_host: udp_port = int(conf.get('log_udp_port', logging.handlers.SYSLOG_UDP_PORT)) - handler = SysLogHandler(address=(udp_host, udp_port), - facility=facility) + handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port), + facility=facility) else: log_address = conf.get('log_address', '/dev/log') try: - handler = SysLogHandler(address=log_address, facility=facility) + handler = ThreadSafeSysLogHandler(address=log_address, + facility=facility) except socket.error as e: # Either /dev/log isn't a UNIX socket or it does not exist at all if e.errno not in [errno.ENOTSOCK, errno.ENOENT]: raise - handler = SysLogHandler(facility=facility) + handler = ThreadSafeSysLogHandler(facility=facility) handler.setFormatter(formatter) logger.addHandler(handler) get_logger.handler4logger[logger] = handler @@ -4304,3 +4308,123 @@ def replace_partition_in_path(path, part_power): path_components[-4] = "%d" % part return os.sep.join(path_components) + + +class PipeMutex(object): + """ + Mutex using a pipe. Works across both greenlets and real threads, even + at the same time. + """ + + def __init__(self): + self.rfd, self.wfd = os.pipe() + + # You can't create a pipe in non-blocking mode; you must set it + # later. + rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) + fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) + os.write(self.wfd, b'-') # start unlocked + + self.owner = None + self.recursion_depth = 0 + + # Usually, it's an error to have multiple greenthreads all waiting + # to read the same file descriptor. It's often a sign of inadequate + # concurrency control; for example, if you have two greenthreads + # trying to use the same memcache connection, they'll end up writing + # interleaved garbage to the socket or stealing part of each others' + # responses. + # + # In this case, we have multiple greenthreads waiting on the same + # file descriptor by design. This lets greenthreads in real thread A + # wait with greenthreads in real thread B for the same mutex. + # Therefore, we must turn off eventlet's multiple-reader detection. + # + # It would be better to turn off multiple-reader detection for only + # our calls to trampoline(), but eventlet does not support that. + eventlet.debug.hub_prevent_multiple_readers(False) + + def acquire(self, blocking=True): + """ + Acquire the mutex. + + If called with blocking=False, returns True if the mutex was + acquired and False if it wasn't. Otherwise, blocks until the mutex + is acquired and returns True. + + This lock is recursive; the same greenthread may acquire it as many + times as it wants to, though it must then release it that many times + too. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner == current_greenthread_id: + self.recursion_depth += 1 + return True + + while True: + try: + # If there is a byte available, this will read it and remove + # it from the pipe. If not, this will raise OSError with + # errno=EAGAIN. + os.read(self.rfd, 1) + self.owner = current_greenthread_id + return True + except OSError as err: + if err.errno != errno.EAGAIN: + raise + + if not blocking: + return False + + # Tell eventlet to suspend the current greenthread until + # self.rfd becomes readable. This will happen when someone + # else writes to self.wfd. + trampoline(self.rfd, read=True) + + def release(self): + """ + Release the mutex. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner != current_greenthread_id: + raise RuntimeError("cannot release un-acquired lock") + + if self.recursion_depth > 0: + self.recursion_depth -= 1 + return + + self.owner = None + os.write(self.wfd, b'X') + + def close(self): + """ + Close the mutex. This releases its file descriptors. + + You can't use a mutex after it's been closed. + """ + if self.wfd is not None: + os.close(self.rfd) + self.rfd = None + os.close(self.wfd) + self.wfd = None + self.owner = None + self.recursion_depth = 0 + + def __del__(self): + # We need this so we don't leak file descriptors. Otherwise, if you + # call get_logger() and don't explicitly dispose of it by calling + # logger.logger.handlers[0].lock.close() [1], the pipe file + # descriptors are leaked. + # + # This only really comes up in tests. Swift processes tend to call + # get_logger() once and then hang on to it until they exit, but the + # test suite calls get_logger() a lot. + # + # [1] and that's a completely ridiculous thing to expect callers to + # do, so nobody does it and that's okay. + self.close() + + +class ThreadSafeSysLogHandler(SysLogHandler): + def createLock(self): + self.lock = PipeMutex() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index a38cc72233..c71b59b62f 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -21,7 +21,9 @@ import ctypes import contextlib import errno import eventlet +import eventlet.debug import eventlet.event +import eventlet.patcher import functools import grp import logging @@ -1489,7 +1491,7 @@ class TestUtils(unittest.TestCase): 'test1\ntest3\ntest4\ntest6\n') def test_get_logger_sysloghandler_plumbing(self): - orig_sysloghandler = utils.SysLogHandler + orig_sysloghandler = utils.ThreadSafeSysLogHandler syslog_handler_args = [] def syslog_handler_catcher(*args, **kwargs): @@ -1500,7 +1502,7 @@ class TestUtils(unittest.TestCase): syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3 try: - utils.SysLogHandler = syslog_handler_catcher + utils.ThreadSafeSysLogHandler = syslog_handler_catcher utils.get_logger({ 'log_facility': 'LOG_LOCAL3', }, 'server', log_route='server') @@ -1550,7 +1552,7 @@ class TestUtils(unittest.TestCase): 'facility': orig_sysloghandler.LOG_LOCAL0})], syslog_handler_args) finally: - utils.SysLogHandler = orig_sysloghandler + utils.ThreadSafeSysLogHandler = orig_sysloghandler @reset_logger_state def test_clean_logger_exception(self): @@ -6252,5 +6254,179 @@ name = %s reload_storage_policies() self.assertIsNotNone(POLICIES.get_by_name(self.policy_name)) + +class TestPipeMutex(unittest.TestCase): + def setUp(self): + self.mutex = utils.PipeMutex() + + def tearDown(self): + self.mutex.close() + + def test_nonblocking(self): + evt_lock1 = eventlet.event.Event() + evt_lock2 = eventlet.event.Event() + evt_unlock = eventlet.event.Event() + + def get_the_lock(): + self.mutex.acquire() + evt_lock1.send('got the lock') + evt_lock2.wait() + self.mutex.release() + evt_unlock.send('released the lock') + + eventlet.spawn(get_the_lock) + evt_lock1.wait() # Now, the other greenthread has the lock. + + self.assertFalse(self.mutex.acquire(blocking=False)) + evt_lock2.send('please release the lock') + evt_unlock.wait() # The other greenthread has released the lock. + self.assertTrue(self.mutex.acquire(blocking=False)) + + def test_recursive(self): + self.assertTrue(self.mutex.acquire(blocking=False)) + self.assertTrue(self.mutex.acquire(blocking=False)) + + def try_acquire_lock(): + return self.mutex.acquire(blocking=False) + + self.assertFalse(eventlet.spawn(try_acquire_lock).wait()) + self.mutex.release() + self.assertFalse(eventlet.spawn(try_acquire_lock).wait()) + self.mutex.release() + self.assertTrue(eventlet.spawn(try_acquire_lock).wait()) + + def test_release_without_acquire(self): + self.assertRaises(RuntimeError, self.mutex.release) + + def test_too_many_releases(self): + self.mutex.acquire() + self.mutex.release() + self.assertRaises(RuntimeError, self.mutex.release) + + def test_wrong_releaser(self): + self.mutex.acquire() + self.assertRaises(RuntimeError, + eventlet.spawn(self.mutex.release).wait) + + def test_blocking(self): + evt = eventlet.event.Event() + + sequence = [] + + def coro1(): + eventlet.sleep(0) # let coro2 go + + self.mutex.acquire() + sequence.append('coro1 acquire') + evt.send('go') + self.mutex.release() + sequence.append('coro1 release') + + def coro2(): + evt.wait() # wait for coro1 to start us + self.mutex.acquire() + sequence.append('coro2 acquire') + self.mutex.release() + sequence.append('coro2 release') + + c1 = eventlet.spawn(coro1) + c2 = eventlet.spawn(coro2) + + c1.wait() + c2.wait() + + self.assertEqual(sequence, [ + 'coro1 acquire', + 'coro1 release', + 'coro2 acquire', + 'coro2 release']) + + def test_blocking_tpool(self): + # Note: this test's success isn't a guarantee that the mutex is + # working. However, this test's failure means that the mutex is + # definitely broken. + sequence = [] + + def do_stuff(): + n = 10 + while n > 0: + self.mutex.acquire() + sequence.append("<") + eventlet.sleep(0.0001) + sequence.append(">") + self.mutex.release() + n -= 1 + + greenthread1 = eventlet.spawn(do_stuff) + greenthread2 = eventlet.spawn(do_stuff) + + real_thread1 = eventlet.patcher.original('threading').Thread( + target=do_stuff) + real_thread1.start() + + real_thread2 = eventlet.patcher.original('threading').Thread( + target=do_stuff) + real_thread2.start() + + greenthread1.wait() + greenthread2.wait() + real_thread1.join() + real_thread2.join() + + self.assertEqual(''.join(sequence), "<>" * 40) + + def test_blocking_preserves_ownership(self): + pthread1_event = eventlet.patcher.original('threading').Event() + pthread2_event1 = eventlet.patcher.original('threading').Event() + pthread2_event2 = eventlet.patcher.original('threading').Event() + thread_id = [] + owner = [] + + def pthread1(): + thread_id.append(id(eventlet.greenthread.getcurrent())) + self.mutex.acquire() + owner.append(self.mutex.owner) + pthread2_event1.set() + + orig_os_write = utils.os.write + + def patched_os_write(*a, **kw): + try: + return orig_os_write(*a, **kw) + finally: + pthread1_event.wait() + + with mock.patch.object(utils.os, 'write', patched_os_write): + self.mutex.release() + pthread2_event2.set() + + def pthread2(): + pthread2_event1.wait() # ensure pthread1 acquires lock first + thread_id.append(id(eventlet.greenthread.getcurrent())) + self.mutex.acquire() + pthread1_event.set() + pthread2_event2.wait() + owner.append(self.mutex.owner) + self.mutex.release() + + real_thread1 = eventlet.patcher.original('threading').Thread( + target=pthread1) + real_thread1.start() + + real_thread2 = eventlet.patcher.original('threading').Thread( + target=pthread2) + real_thread2.start() + + real_thread1.join() + real_thread2.join() + self.assertEqual(thread_id, owner) + self.assertIsNone(self.mutex.owner) + + @classmethod + def tearDownClass(cls): + # PipeMutex turns this off when you instantiate one + eventlet.debug.hub_prevent_multiple_readers(True) + + if __name__ == '__main__': unittest.main()