Fix ValueError from handoff logging.

If you have StatsD logging turned on, then the iterator returned by
Ring.get_part_nodes() will emit StatsD packets when it yields a
handoff node. That network IO may make eventlet trampoline to another
greenthread before returning from next(). Now, if that other
greenthread tries to call next() on that same iterator, it blows up
with a ValueError.

Any socket IO inside a generator's next() method can cause this. It's
easiest to reproduce with StatsD logging turned on, but logging to
syslog can trigger it too.

You can see this happen sometimes in the proxy's make_requests method
if two of the primary nodes are down. Greenthread A goes into next()
to get a handoff node, then sends a StatsD packet, and so eventlet
trampolines to Greenthread B. Now, Greenthread B also calls next() to
get a handoff node, and dies with a ValueError.

This commit wraps up concurrently-accessed iter_nodes() iterators in a
new thing called a GreenthreadSafeIterator that serializes access.

Bug 1180110
Change-Id: I8fe13d7295c056a2cab9e084f5966078a49bdc13
This commit is contained in:
Samuel Merritt 2013-05-14 13:04:01 -07:00
parent deb01b840d
commit bd3ca778d1
4 changed files with 98 additions and 4 deletions

View File

@ -43,6 +43,7 @@ from urlparse import urlparse as stdlib_urlparse, ParseResult
import itertools
import eventlet
import eventlet.semaphore
from eventlet import GreenPool, sleep, Timeout, tpool
from eventlet.green import socket, threading
import netifaces
@ -409,6 +410,29 @@ def validate_device_partition(device, partition):
raise ValueError('Invalid partition: %s' % quote(partition or ''))
class GreenthreadSafeIterator(object):
"""
Wrap an iterator to ensure that only one greenthread is inside its next()
method at a time.
This is useful if an iterator's next() method may perform network IO, as
that may trigger a greenthread context switch (aka trampoline), which can
give another greenthread a chance to call next(). At that point, you get
an error like "ValueError: generator already executing". By wrapping calls
to next() with a mutex, we avoid that error.
"""
def __init__(self, unsafe_iterable):
self.unsafe_iter = iter(unsafe_iterable)
self.semaphore = eventlet.semaphore.Semaphore(value=1)
def __iter__(self):
return self
def next(self):
with self.semaphore:
return self.unsafe_iter.next()
class NullLogger():
"""A no-op logger for eventlet wsgi."""

View File

@ -35,7 +35,8 @@ from eventlet.timeout import Timeout
from swift.common.wsgi import make_pre_authed_request
from swift.common.utils import normalize_timestamp, config_true_value, \
public, split_path, cache_from_env, list_from_csv
public, split_path, cache_from_env, list_from_csv, \
GreenthreadSafeIterator
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout
from swift.common.http import is_informational, is_success, is_redirection, \
@ -557,6 +558,13 @@ class Controller(object):
nodes. If a node yielded subsequently gets error limited, an
extra node will be yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
swift.common.utils.GreenthreadSafeIterator to serialize access.
Otherwise, you may get ValueErrors from concurrent access. (You also
may not, depending on how logging is configured, the vagaries of
socket IO and eventlet, and the phase of the moon.)
:param ring: ring to get yield nodes from
:param partition: ring partition to yield nodes for
"""
@ -621,7 +629,7 @@ class Controller(object):
:returns: a swob.Response object
"""
start_nodes = ring.get_part_nodes(part)
nodes = self.iter_nodes(ring, part)
nodes = GreenthreadSafeIterator(self.iter_nodes(ring, part))
pile = GreenPile(len(start_nodes))
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,

View File

@ -37,7 +37,7 @@ from eventlet.queue import Queue
from eventlet.timeout import Timeout
from swift.common.utils import ContextPool, normalize_timestamp, \
config_true_value, public, json, csv_append
config_true_value, public, json, csv_append, GreenthreadSafeIterator
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
@ -864,7 +864,8 @@ class ObjectController(Controller):
else:
delete_at_part = delete_at_nodes = None
node_iter = self.iter_nodes(self.app.object_ring, partition)
node_iter = GreenthreadSafeIterator(
self.iter_nodes(self.app.object_ring, partition))
pile = GreenPile(len(nodes))
chunked = req.headers.get('transfer-encoding')

View File

@ -19,6 +19,7 @@ from __future__ import with_statement
from test.unit import temptree
import ctypes
import errno
import eventlet
import logging
import os
import random
@ -1464,6 +1465,66 @@ class TestStatsdLogging(unittest.TestCase):
self.assert_(mock_controller.args[1] > 0)
class UnsafeXrange(object):
"""
Like xrange(limit), but with extra context switching to screw things up.
"""
def __init__(self, upper_bound):
self.current = 0
self.concurrent_calls = 0
self.upper_bound = upper_bound
def __iter__(self):
return self
def next(self):
if self.concurrent_calls > 0:
raise ValueError("concurrent access is bad, mmmkay? (%r)")
self.concurrent_calls += 1
try:
if self.current >= self.upper_bound:
raise StopIteration
else:
val = self.current
self.current += 1
eventlet.sleep() # yield control
return val
finally:
self.concurrent_calls -= 1
class TestGreenthreadSafeIterator(unittest.TestCase):
def increment(self, iterable):
plus_ones = []
for n in iterable:
plus_ones.append(n + 1)
return plus_ones
def test_setup_works(self):
# it should work without concurrent access
self.assertEquals([0, 1, 2, 3], list(UnsafeXrange(4)))
iterable = UnsafeXrange(10)
pile = eventlet.GreenPile(2)
for _ in xrange(2):
pile.spawn(self.increment, iterable)
try:
response = sorted([resp for resp in pile])
self.assertTrue(False, "test setup is insufficiently crazy")
except ValueError:
pass
def test_access_is_serialized(self):
pile = eventlet.GreenPile(2)
iterable = utils.GreenthreadSafeIterator(UnsafeXrange(10))
for _ in xrange(2):
pile.spawn(self.increment, iterable)
response = sorted(sum([resp for resp in pile], []))
self.assertEquals(range(1, 11), response)
class TestStatsdLoggingDelegation(unittest.TestCase):
def setUp(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)