From bd3ca778d10c461210f26ea3d194cacc70bef975 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Tue, 14 May 2013 13:04:01 -0700 Subject: [PATCH] Fix ValueError from handoff logging. If you have StatsD logging turned on, then the iterator returned by Ring.get_part_nodes() will emit StatsD packets when it yields a handoff node. That network IO may make eventlet trampoline to another greenthread before returning from next(). Now, if that other greenthread tries to call next() on that same iterator, it blows up with a ValueError. Any socket IO inside a generator's next() method can cause this. It's easiest to reproduce with StatsD logging turned on, but logging to syslog can trigger it too. You can see this happen sometimes in the proxy's make_requests method if two of the primary nodes are down. Greenthread A goes into next() to get a handoff node, then sends a StatsD packet, and so eventlet trampolines to Greenthread B. Now, Greenthread B also calls next() to get a handoff node, and dies with a ValueError. This commit wraps up concurrently-accessed iter_nodes() iterators in a new thing called a GreenthreadSafeIterator that serializes access. Bug 1180110 Change-Id: I8fe13d7295c056a2cab9e084f5966078a49bdc13 --- swift/common/utils.py | 24 +++++++++++++ swift/proxy/controllers/base.py | 12 +++++-- swift/proxy/controllers/obj.py | 5 +-- test/unit/common/test_utils.py | 61 +++++++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 4 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index 8ea9ea4392..fbf92eb1ec 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -43,6 +43,7 @@ from urlparse import urlparse as stdlib_urlparse, ParseResult import itertools import eventlet +import eventlet.semaphore from eventlet import GreenPool, sleep, Timeout, tpool from eventlet.green import socket, threading import netifaces @@ -409,6 +410,29 @@ def validate_device_partition(device, partition): raise ValueError('Invalid partition: %s' % quote(partition or '')) +class GreenthreadSafeIterator(object): + """ + Wrap an iterator to ensure that only one greenthread is inside its next() + method at a time. + + This is useful if an iterator's next() method may perform network IO, as + that may trigger a greenthread context switch (aka trampoline), which can + give another greenthread a chance to call next(). At that point, you get + an error like "ValueError: generator already executing". By wrapping calls + to next() with a mutex, we avoid that error. + """ + def __init__(self, unsafe_iterable): + self.unsafe_iter = iter(unsafe_iterable) + self.semaphore = eventlet.semaphore.Semaphore(value=1) + + def __iter__(self): + return self + + def next(self): + with self.semaphore: + return self.unsafe_iter.next() + + class NullLogger(): """A no-op logger for eventlet wsgi.""" diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 5c056cd3fe..e0a86a8469 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -35,7 +35,8 @@ from eventlet.timeout import Timeout from swift.common.wsgi import make_pre_authed_request from swift.common.utils import normalize_timestamp, config_true_value, \ - public, split_path, cache_from_env, list_from_csv + public, split_path, cache_from_env, list_from_csv, \ + GreenthreadSafeIterator from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout from swift.common.http import is_informational, is_success, is_redirection, \ @@ -557,6 +558,13 @@ class Controller(object): nodes. If a node yielded subsequently gets error limited, an extra node will be yielded to take its place. + Note that if you're going to iterate over this concurrently from + multiple greenthreads, you'll want to use a + swift.common.utils.GreenthreadSafeIterator to serialize access. + Otherwise, you may get ValueErrors from concurrent access. (You also + may not, depending on how logging is configured, the vagaries of + socket IO and eventlet, and the phase of the moon.) + :param ring: ring to get yield nodes from :param partition: ring partition to yield nodes for """ @@ -621,7 +629,7 @@ class Controller(object): :returns: a swob.Response object """ start_nodes = ring.get_part_nodes(part) - nodes = self.iter_nodes(ring, part) + nodes = GreenthreadSafeIterator(self.iter_nodes(ring, part)) pile = GreenPile(len(start_nodes)) for head in headers: pile.spawn(self._make_request, nodes, part, method, path, diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index f27bece0d7..5bdd9be362 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -37,7 +37,7 @@ from eventlet.queue import Queue from eventlet.timeout import Timeout from swift.common.utils import ContextPool, normalize_timestamp, \ - config_true_value, public, json, csv_append + config_true_value, public, json, csv_append, GreenthreadSafeIterator from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE @@ -864,7 +864,8 @@ class ObjectController(Controller): else: delete_at_part = delete_at_nodes = None - node_iter = self.iter_nodes(self.app.object_ring, partition) + node_iter = GreenthreadSafeIterator( + self.iter_nodes(self.app.object_ring, partition)) pile = GreenPile(len(nodes)) chunked = req.headers.get('transfer-encoding') diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 44a741e071..90de9479d0 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -19,6 +19,7 @@ from __future__ import with_statement from test.unit import temptree import ctypes import errno +import eventlet import logging import os import random @@ -1464,6 +1465,66 @@ class TestStatsdLogging(unittest.TestCase): self.assert_(mock_controller.args[1] > 0) +class UnsafeXrange(object): + """ + Like xrange(limit), but with extra context switching to screw things up. + """ + def __init__(self, upper_bound): + self.current = 0 + self.concurrent_calls = 0 + self.upper_bound = upper_bound + + def __iter__(self): + return self + + def next(self): + if self.concurrent_calls > 0: + raise ValueError("concurrent access is bad, mmmkay? (%r)") + + self.concurrent_calls += 1 + try: + if self.current >= self.upper_bound: + raise StopIteration + else: + val = self.current + self.current += 1 + eventlet.sleep() # yield control + return val + finally: + self.concurrent_calls -= 1 + + +class TestGreenthreadSafeIterator(unittest.TestCase): + def increment(self, iterable): + plus_ones = [] + for n in iterable: + plus_ones.append(n + 1) + return plus_ones + + def test_setup_works(self): + # it should work without concurrent access + self.assertEquals([0, 1, 2, 3], list(UnsafeXrange(4))) + + iterable = UnsafeXrange(10) + pile = eventlet.GreenPile(2) + for _ in xrange(2): + pile.spawn(self.increment, iterable) + + try: + response = sorted([resp for resp in pile]) + self.assertTrue(False, "test setup is insufficiently crazy") + except ValueError: + pass + + def test_access_is_serialized(self): + pile = eventlet.GreenPile(2) + iterable = utils.GreenthreadSafeIterator(UnsafeXrange(10)) + for _ in xrange(2): + pile.spawn(self.increment, iterable) + response = sorted(sum([resp for resp in pile], [])) + self.assertEquals(range(1, 11), response) + + class TestStatsdLoggingDelegation(unittest.TestCase): def setUp(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)