Event simulator II

The previous event simulator simulated time by making all tasks that
would have been launched as threads run in the same thread. That only
worked to a point but didn't properly simulate more complex behaviors
such as clustering.

This new one handles things properly by still running tasks in
eventlet's greenthreads but forcing them to only run one at a time.

Change-Id: Ife834e8d55193da8416ba700e55e7b0c2496f532
This commit is contained in:
Tim Simpson 2014-09-24 13:45:52 -05:00
parent a5c5150711
commit 220e3e7255
9 changed files with 395 additions and 122 deletions

View File

@ -190,7 +190,7 @@ paste.filter_factory = trove.common.wsgi:ContextMiddleware.factory
paste.filter_factory = trove.common.wsgi:FaultWrapper.factory paste.filter_factory = trove.common.wsgi:FaultWrapper.factory
[filter:ratelimit] [filter:ratelimit]
paste.filter_factory = trove.common.limits:RateLimitingMiddleware.factory paste.filter_factory = trove.tests.fakes.limits:FakeRateLimitingMiddleware.factory
[app:troveapp] [app:troveapp]
paste.app_factory = trove.common.api:app_factory paste.app_factory = trove.common.api:app_factory

View File

@ -16,6 +16,7 @@
# under the License. # under the License.
# #
import functools
import gettext import gettext
import os import os
import urllib import urllib
@ -34,6 +35,7 @@ import eventlet
eventlet.monkey_patch(thread=False) eventlet.monkey_patch(thread=False)
CONF = cfg.CONF CONF = cfg.CONF
original_excepthook = sys.excepthook
def add_support_for_localization(): def add_support_for_localization():
@ -153,36 +155,46 @@ def initialize_fakes(app):
wsgi_interceptor) wsgi_interceptor)
from trove.tests.util import event_simulator from trove.tests.util import event_simulator
event_simulator.monkey_patch() event_simulator.monkey_patch()
from trove.tests.fakes import taskmanager
taskmanager.monkey_patch()
def parse_args_for_test_config(): def parse_args_for_test_config():
test_conf = 'etc/tests/localhost.test.conf'
repl = False
new_argv = []
for index in range(len(sys.argv)): for index in range(len(sys.argv)):
arg = sys.argv[index] arg = sys.argv[index]
print(arg) print(arg)
if arg[:14] == "--test-config=": if arg[:14] == "--test-config=":
del sys.argv[index] test_conf = arg[14:]
return arg[14:] elif arg == "--repl":
return 'etc/tests/localhost.test.conf' repl = True
else:
new_argv.append(arg)
sys.argv = new_argv
return test_conf, repl
if __name__ == "__main__":
try:
wsgi_install()
add_support_for_localization()
# Load Trove app
# Paste file needs absolute path
config_file = os.path.realpath('etc/trove/trove.conf.test')
# 'etc/trove/test-api-paste.ini'
app = initialize_trove(config_file)
# Initialize sqlite database.
initialize_database()
# Swap out WSGI, httplib, and several sleep functions
# with test doubles.
initialize_fakes(app)
# Initialize the test configuration. def run_tests(repl):
test_config_file = parse_args_for_test_config() """Runs all of the tests."""
CONFIG.load_from_file(test_config_file)
if repl:
# Actually show errors in the repl.
sys.excepthook = original_excepthook
def no_thanks(exit_code):
print("Tests finished with exit code %d." % exit_code)
sys.exit = no_thanks
proboscis.TestProgram().run_and_exit()
if repl:
import code
code.interact()
def import_tests():
# F401 unused imports needed for tox tests # F401 unused imports needed for tox tests
from trove.tests.api import backups # noqa from trove.tests.api import backups # noqa
from trove.tests.api import header # noqa from trove.tests.api import header # noqa
@ -210,9 +222,38 @@ if __name__ == "__main__":
from trove.tests.api.mgmt import storage # noqa from trove.tests.api.mgmt import storage # noqa
from trove.tests.api.mgmt import malformed_json # noqa from trove.tests.api.mgmt import malformed_json # noqa
from trove.tests.db import migrations # noqa from trove.tests.db import migrations # noqa
def main(import_func):
try:
wsgi_install()
add_support_for_localization()
# Load Trove app
# Paste file needs absolute path
config_file = os.path.realpath('etc/trove/trove.conf.test')
# 'etc/trove/test-api-paste.ini'
app = initialize_trove(config_file)
# Initialize sqlite database.
initialize_database()
# Swap out WSGI, httplib, and other components with test doubles.
initialize_fakes(app)
# Initialize the test configuration.
test_config_file, repl = parse_args_for_test_config()
CONFIG.load_from_file(test_config_file)
import_func()
from trove.tests.util import event_simulator
event_simulator.run_main(functools.partial(run_tests, repl))
except Exception as e: except Exception as e:
# Printing the error manually like this is necessary due to oddities
# with sys.excepthook.
print("Run tests failed: %s" % e) print("Run tests failed: %s" % e)
traceback.print_exc() traceback.print_exc()
raise raise
proboscis.TestProgram().run_and_exit()
if __name__ == "__main__":
main(import_tests)

View File

@ -207,7 +207,7 @@ class RateLimitingMiddleware(base_wsgi.TroveMiddleware):
delay, error = self._limiter.check_for_delay(verb, url, tenant_id) delay, error = self._limiter.check_for_delay(verb, url, tenant_id)
if delay: if delay and self.enabled():
msg = _("This request was rate-limited.") msg = _("This request was rate-limited.")
retry = time.time() + delay retry = time.time() + delay
return base_wsgi.OverLimitFault(msg, error, retry) return base_wsgi.OverLimitFault(msg, error, retry)
@ -216,6 +216,9 @@ class RateLimitingMiddleware(base_wsgi.TroveMiddleware):
return self.application return self.application
def enabled(self):
return True
class Limiter(object): class Limiter(object):
""" """

View File

@ -33,6 +33,7 @@ from trove.tests.util.users import Requirements
from trove.tests.api.instances import instance_info from trove.tests.api.instances import instance_info
from trove.tests.api.instances import VOLUME_SUPPORT from trove.tests.api.instances import VOLUME_SUPPORT
CONF = cfg.CONF CONF = cfg.CONF
@ -52,15 +53,24 @@ class TestBase(object):
volume, [], []) volume, [], [])
return result.id return result.id
def wait_for_instance_status(self, instance_id, status="ACTIVE"): def wait_for_instance_status(self, instance_id, status="ACTIVE",
acceptable_states=None):
if acceptable_states:
acceptable_states.append(status)
def assert_state(instance):
if acceptable_states:
assert_true(instance.status in acceptable_states,
"Invalid status: %s" % instance.status)
return instance
poll_until(lambda: self.dbaas.instances.get(instance_id), poll_until(lambda: self.dbaas.instances.get(instance_id),
lambda instance: instance.status == status, lambda instance: assert_state(instance).status == status,
time_out=3, sleep_time=1) time_out=30, sleep_time=1)
def wait_for_instance_task_status(self, instance_id, description): def wait_for_instance_task_status(self, instance_id, description):
poll_until(lambda: self.dbaas.management.show(instance_id), poll_until(lambda: self.dbaas.management.show(instance_id),
lambda instance: instance.task_description == description, lambda instance: instance.task_description == description,
time_out=3, sleep_time=1) time_out=30, sleep_time=1)
def is_instance_deleted(self, instance_id): def is_instance_deleted(self, instance_id):
while True: while True:

View File

@ -27,6 +27,7 @@ from trove.tests.util import create_dbaas_client
from troveclient.compat import exceptions from troveclient.compat import exceptions
from datetime import datetime from datetime import datetime
from trove.tests.util.users import Users from trove.tests.util.users import Users
from trove.tests.fakes import limits as fake_limits
GROUP = "dbaas.api.limits" GROUP = "dbaas.api.limits"
DEFAULT_RATE = 200 DEFAULT_RATE = 200
@ -35,6 +36,15 @@ DEFAULT_MAX_INSTANCES = 55
DEFAULT_MAX_BACKUPS = 5 DEFAULT_MAX_BACKUPS = 5
def ensure_limits_are_not_faked(func):
def _cd(*args, **kwargs):
fake_limits.ENABLED = True
try:
return func(*args, **kwargs)
finally:
fake_limits.ENABLED = False
@test(groups=[GROUP]) @test(groups=[GROUP])
class Limits(object): class Limits(object):
@ -81,6 +91,7 @@ class Limits(object):
return d return d
@test @test
@ensure_limits_are_not_faked
def test_limits_index(self): def test_limits_index(self):
"""Test_limits_index.""" """Test_limits_index."""
@ -101,6 +112,7 @@ class Limits(object):
assert_true(d[k].nextAvailable is not None) assert_true(d[k].nextAvailable is not None)
@test @test
@ensure_limits_are_not_faked
def test_limits_get_remaining(self): def test_limits_get_remaining(self):
"""Test_limits_get_remaining.""" """Test_limits_get_remaining."""
@ -121,6 +133,7 @@ class Limits(object):
assert_true(get.nextAvailable is not None) assert_true(get.nextAvailable is not None)
@test @test
@ensure_limits_are_not_faked
def test_limits_exception(self): def test_limits_exception(self):
"""Test_limits_exception.""" """Test_limits_exception."""

View File

@ -0,0 +1,26 @@
# Copyright 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from trove.common import limits
ENABLED = False
class FakeRateLimitingMiddleware(limits.RateLimitingMiddleware):
def enabled(self):
return ENABLED

View File

@ -297,7 +297,7 @@ class FakeServers(object):
"available.") "available.")
server.schedule_status("ACTIVE", 1) server.schedule_status("ACTIVE", 1)
LOG.info(_("FAKE_SERVERS_DB : %s") % str(FAKE_SERVERS_DB)) LOG.info("FAKE_SERVERS_DB : %s" % str(FAKE_SERVERS_DB))
return server return server
def _get_volumes_from_bdm(self, block_device_mapping): def _get_volumes_from_bdm(self, block_device_mapping):
@ -734,6 +734,9 @@ class FakeSecurityGroups(object):
self.securityGroups[secGrp.get_id()] = secGrp self.securityGroups[secGrp.get_id()] = secGrp
return secGrp return secGrp
def delete(self, group_id):
pass
def list(self): def list(self):
pass pass

View File

@ -0,0 +1,53 @@
# Copyright 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import eventlet
from trove.taskmanager import api
from trove.taskmanager.manager import Manager
class FakeApi(api.API):
def __init__(self, context):
self.context = context
def make_msg(self, method_name, *args, **kwargs):
return {"name": method_name, "args": args, "kwargs": kwargs}
def call(self, context, msg):
manager, method = self.get_tm_method(msg['name'])
return method(manager, context, *msg['args'], **msg['kwargs'])
def cast(self, context, msg):
manager, method = self.get_tm_method(msg['name'])
def func():
method(manager, context, *msg['args'], **msg['kwargs'])
eventlet.spawn_after(0.1, func)
def get_tm_method(self, method_name):
manager = Manager()
method = getattr(Manager, method_name)
return manager, method
def monkey_patch():
api.API = FakeApi
def fake_load(context, manager=None):
return FakeApi(context)
api.load = fake_load

View File

@ -1,6 +1,4 @@
# Copyright 2013 OpenStack Foundation # Copyright 2014 Rackspace Hosting
# Copyright 2013 Rackspace Hosting
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -18,116 +16,242 @@
""" """
Simulates time itself to make the fake mode tests run even faster. Simulates time itself to make the fake mode tests run even faster.
Specifically, this forces all various threads of execution to run one at a time
based on when they would have been scheduled using the various eventlet spawn
functions. Because only one thing is running at a given time, it eliminates
race conditions that would normally be present from testing multi-threaded
scenarios. It also means that the simulated time.sleep does not actually have
to sit around for the designated time, which greatly speeds up the time it
takes to run the tests.
"""
import eventlet
from eventlet import spawn as true_spawn
from eventlet.event import Event
from eventlet.semaphore import Semaphore
class Coroutine(object):
"""
This class simulates a coroutine, which is ironic, as greenlet actually
*is* a coroutine. But trying to use greenlet here gives nasty results
since eventlet thoroughly monkey-patches things, making it difficult
to run greenlet on its own.
Essentially think of this as a wrapper for eventlet's threads which has a
run and sleep function similar to old school coroutines, meaning it won't
start until told and when asked to sleep it won't wake back up without
permission.
""" """
from proboscis.asserts import fail ALL = []
from trove.openstack.common import log as logging
from trove.common import exception def __init__(self, func, *args, **kwargs):
self.my_sem = Semaphore(0) # This is held by the thread as it runs.
self.caller_sem = None
self.dead = False
started = Event()
self.id = 5
self.ALL.append(self)
def go():
self.id = eventlet.corolocal.get_ident()
started.send(True)
self.my_sem.acquire(blocking=True, timeout=None)
try:
func(*args, **kwargs)
# except Exception as e:
# print("Exception in coroutine! %s" % e)
finally:
self.dead = True
self.caller_sem.release() # Relinquish control back to caller.
for i in range(len(self.ALL)):
if self.ALL[i].id == self.id:
del self.ALL[i]
break
true_spawn(go)
started.wait()
@classmethod
def get_current(cls):
"""Finds the coroutine associated with the thread which calls it."""
return cls.get_by_id(eventlet.corolocal.get_ident())
@classmethod
def get_by_id(cls, id):
for cr in cls.ALL:
if cr.id == id:
return cr
raise RuntimeError("Coroutine with id %s not found!" % id)
def sleep(self):
"""Puts the coroutine to sleep until run is called again.
This should only be called by the thread which owns this object.
"""
# Only call this from it's own thread.
assert eventlet.corolocal.get_ident() == self.id
self.caller_sem.release() # Relinquish control back to caller.
self.my_sem.acquire(blocking=True, timeout=None)
def run(self):
"""Starts up the thread. Should be called from a different thread."""
# Don't call this from the thread which it represents.
assert eventlet.corolocal.get_ident() != self.id
self.caller_sem = Semaphore(0)
self.my_sem.release()
self.caller_sem.acquire() # Wait for it to finish.
LOG = logging.getLogger(__name__)
allowable_empty_sleeps = 0
pending_events = []
sleep_entrance_count = 0 sleep_entrance_count = 0
main_greenlet = None
def event_simulator_spawn_after(time_from_now_in_seconds, func, *args, **kw):
"""Fakes events without doing any actual waiting."""
def __cb():
func(*args, **kw)
pending_events.append({"time": time_from_now_in_seconds, "func": __cb})
def event_simulator_spawn(func, *args, **kw): fake_threads = []
event_simulator_spawn_after(0, func, *args, **kw)
def event_simulator_sleep(time_to_sleep): allowable_empty_sleeps = 1
"""Simulates waiting for an event. sleep_allowance = allowable_empty_sleeps
This is used to monkey patch the sleep methods, so that no actually waiting
occurs but functions which would have run as threads are executed.
This function will also raise an assertion failure if there were no pending def other_threads_are_active():
events ready to run. If this happens there are two possibilities: """Returns True if concurrent activity is being simulated.
1. The test code (or potentially code in Trove task manager) is
sleeping even though no action is taking place in
another thread.
2. The test code (or task manager code) is sleeping waiting for a
condition that will never be met because the thread it was waiting
on experienced an error or did not finish successfully.
A good example of this second case is when a bug in task manager causes the
create instance method to fail right away, but the test code tries to poll
the instance's status until it gets rate limited. That makes finding the
real error a real hassle. Thus it makes more sense to raise an exception
whenever the app seems to be napping for no reason.
Specifically, this means there is a fake thread in action other than the
"pulse" thread and the main test thread.
""" """
global pending_events return len(fake_threads) >= 2
global allowable_empty_sleeps
if len(pending_events) == 0:
allowable_empty_sleeps -= 1
if allowable_empty_sleeps < 0:
fail("Trying to sleep when no events are pending.")
global sleep_entrance_count
sleep_entrance_count += 1
time_to_sleep = float(time_to_sleep)
run_once = False # Ensure simulator runs even if the sleep time is zero. def fake_sleep(time_to_sleep):
while not run_once or time_to_sleep > 0: """Simulates sleep.
run_once = True
itr_sleep = 0.5 Puts the coroutine which calls it to sleep. If a coroutine object is not
for i in range(len(pending_events)): associated with the caller this will fail.
event = pending_events[i] """
event["time"] = event["time"] - itr_sleep global sleep_allowance
if event["func"] is not None and event["time"] < 0: sleep_allowance -= 1
# Call event, but first delete it so this function can be if not other_threads_are_active():
# reentrant. if sleep_allowance < -1:
func = event["func"] raise RuntimeError("Sleeping for no reason.")
event["func"] = None else:
try: return # Forgive the thread for calling this for one time.
func() sleep_allowance = allowable_empty_sleeps
except Exception:
LOG.exception("Simulated event error.") cr = Coroutine.get_current()
time_to_sleep -= itr_sleep for ft in fake_threads:
sleep_entrance_count -= 1 if ft['greenlet'].id == cr.id:
if sleep_entrance_count < 1: ft['next_sleep_time'] = time_to_sleep
# Clear out old events
pending_events = [event for event in pending_events cr.sleep()
if event["func"] is not None]
def fake_poll_until(retriever, condition=lambda value: value, def fake_poll_until(retriever, condition=lambda value: value,
sleep_time=1, time_out=None): sleep_time=1, time_out=None):
"""Retrieves object until it passes condition, then returns it. """Fakes out poll until."""
from trove.common import exception
If time_out_limit is passed in, PollTimeOut will be raised once that
amount of time is eclipsed.
"""
slept_time = 0 slept_time = 0
while True: while True:
resource = retriever() resource = retriever()
if condition(resource): if condition(resource):
return resource return resource
event_simulator_sleep(sleep_time) fake_sleep(sleep_time)
slept_time += sleep_time slept_time += sleep_time
if time_out and slept_time >= time_out: if time_out and slept_time >= time_out:
raise exception.PollTimeOut() raise exception.PollTimeOut()
def run_main(func):
"""Runs the given function as the initial thread of the event simulator."""
global main_greenlet
main_greenlet = Coroutine(main_loop)
fake_spawn(0, func)
main_greenlet.run()
def main_loop():
"""The coroutine responsible for calling each "fake thread."
The Coroutine which calls this is the only one that won't end up being
associated with the fake_threads list. The reason is this loop needs to
wait on whatever thread is running, meaning it has to be a Coroutine as
well.
"""
while len(fake_threads) > 0:
pulse(0.1)
def fake_spawn_n(func, *args, **kw):
fake_spawn(0, func, *args, **kw)
def fake_spawn(time_from_now_in_seconds, func, *args, **kw):
"""Fakes eventlet's spawn function by adding a fake thread."""
def thread_start():
#fake_sleep(time_from_now_in_seconds)
return func(*args, **kw)
cr = Coroutine(thread_start)
fake_threads.append({'sleep': time_from_now_in_seconds,
'greenlet': cr,
'name': str(func)})
def pulse(seconds):
"""
Runs the event simulator for the amount of simulated time denoted by
"seconds".
"""
index = 0
while index < len(fake_threads):
t = fake_threads[index]
t['sleep'] -= seconds
if t['sleep'] <= 0:
t['sleep'] = 0
t['next_sleep_time'] = None
t['greenlet'].run()
sleep_time = t['next_sleep_time']
if sleep_time is None or isinstance(sleep_time, tuple):
del fake_threads[index]
index -= 1
else:
t['sleep'] = sleep_time
index += 1
def wait_until_all_activity_stops():
"""In fake mode, wait for all simulated events to chill out.
This can be useful in situations where you need simulated activity (such
as calls running in TaskManager) to "bleed out" and finish before running
another test.
"""
if main_greenlet is None:
return
while other_threads_are_active():
fake_sleep(1)
def monkey_patch(): def monkey_patch():
"""
Changes global functions such as time.sleep, eventlet.spawn* and others
to their event_simulator equivalents.
"""
import time import time
time.sleep = event_simulator_sleep time.sleep = fake_sleep
import eventlet import eventlet
from eventlet import greenthread from eventlet import greenthread
eventlet.sleep = event_simulator_sleep eventlet.sleep = fake_sleep
greenthread.sleep = event_simulator_sleep greenthread.sleep = fake_sleep
eventlet.spawn_after = event_simulator_spawn_after eventlet.spawn_after = fake_spawn
eventlet.spawn_n = event_simulator_spawn
eventlet.spawn = NotImplementedError def raise_error():
raise RuntimeError("Illegal operation!")
eventlet.spawn_n = fake_spawn_n
eventlet.spawn = raise_error
from trove.common import utils from trove.common import utils
utils.poll_until = fake_poll_until utils.poll_until = fake_poll_until