Save raw task results in chunks

* Split task results into chunks with size
  configured by raw_result_chunk_size parameter
* Removed load_finished event as not used anywhere

Change-Id: I09841d02c60ab66eebd851bdf1c0a6d5f9e5a7be
This commit is contained in:
Anton Studenov 2016-12-28 21:07:24 +03:00
parent bb336b0a11
commit 1587efbcc1
10 changed files with 220 additions and 40 deletions

View File

@ -111,6 +111,10 @@
# value)
#openstack_client_http_timeout = 180.0
# Size of raw result chunk in iterations (integer value)
# Minimum value: 1
#raw_result_chunk_size = 1000
[benchmark]
@ -242,12 +246,12 @@
# the status. (floating point value)
#magnum_cluster_create_prepoll_delay = 5.0
# Time(in sec) to wait for magnum cluster to be created. (floating point
# value)
# Time(in sec) to wait for magnum cluster to be created. (floating
# point value)
#magnum_cluster_create_timeout = 1200.0
# Time interval(in sec) between checks when waiting for cluster creation.
# (floating point value)
# Time interval(in sec) between checks when waiting for cluster
# creation. (floating point value)
#magnum_cluster_create_poll_interval = 1.0
# Delay between creating Manila share and polling for its status.

View File

@ -556,6 +556,17 @@
failure_rate:
max: 0
-
args:
sleep: 0
runner:
type: "constant"
times: 4500
concurrency: 20
sla:
failure_rate:
max: 0
Dummy.dummy_exception:
-
args:

View File

@ -257,15 +257,17 @@ def workload_create(task_uuid, subtask_uuid, key):
return get_impl().workload_create(task_uuid, subtask_uuid, key)
def workload_data_create(task_uuid, workload_uuid, data):
def workload_data_create(task_uuid, workload_uuid, chunk_order, data):
"""Create a workload data.
:param task_uuid: string with UUID of Task instance.
:param workload_uuid: string with UUID of Workload instance.
:param chunk_order: ordinal index of workload data
:param data: dict with record values on the workload data.
:returns: a dict with data on the workload data.
"""
return get_impl().workload_data_create(task_uuid, workload_uuid, data)
return get_impl().workload_data_create(task_uuid, workload_uuid,
chunk_order, data)
def workload_set_results(workload_uuid, data):

View File

@ -242,7 +242,10 @@ class Connection(object):
"verification_log": json.dumps(task.validation_result)
}
def _make_old_task_result(self, workload, workload_data):
def _make_old_task_result(self, workload, workload_data_list):
raw_data = [data
for workload_data in workload_data_list
for data in workload_data.chunk_data["raw"]]
return {
"id": workload.id,
"task_uuid": workload.task_uuid,
@ -260,7 +263,7 @@ class Connection(object):
}
},
"data": {
"raw": workload_data.chunk_data["raw"],
"raw": raw_data,
"load_duration": workload.load_duration,
"full_duration": workload.full_duration,
"sla": workload.sla_results["sla"],
@ -268,6 +271,11 @@ class Connection(object):
}
}
def _task_workload_data_get_all(self, workload_uuid):
return (self.model_query(models.WorkloadData).
filter_by(workload_uuid=workload_uuid).
order_by(models.WorkloadData.chunk_order.asc()))
# @db_api.serialize
def task_get(self, uuid):
task = self._task_get(uuid)
@ -407,13 +415,11 @@ class Connection(object):
filter_by(task_uuid=uuid).all())
for workload in workloads:
workload_data = (self.model_query(models.WorkloadData).
filter_by(task_uuid=uuid,
workload_uuid=workload.uuid).
first())
workload_data_list = self._task_workload_data_get_all(
workload.uuid)
results.append(
self._make_old_task_result(workload, workload_data))
self._make_old_task_result(workload, workload_data_list))
return results
@ -451,7 +457,8 @@ class Connection(object):
return workload
@db_api.serialize
def workload_data_create(self, task_uuid, workload_uuid, data):
def workload_data_create(self, task_uuid, workload_uuid, chunk_order,
data):
workload_data = models.WorkloadData(task_uuid=task_uuid,
workload_uuid=workload_uuid)
@ -485,7 +492,7 @@ class Connection(object):
workload_data.update({
"task_uuid": task_uuid,
"workload_uuid": workload_uuid,
"chunk_order": 0,
"chunk_order": chunk_order,
"iteration_count": iter_count,
"failed_iteration_count": failed_iter_count,
"chunk_data": {"raw": raw_data},
@ -503,10 +510,11 @@ class Connection(object):
workload = self.model_query(models.Workload).filter_by(
uuid=workload_uuid).first()
workload_data = self.model_query(models.WorkloadData).filter_by(
workload_uuid=workload_uuid).first()
workload_data_list = self._task_workload_data_get_all(workload.uuid)
raw_data = workload_data.chunk_data.get("raw", [])
raw_data = [raw
for workload_data in workload_data_list
for raw in workload_data.chunk_data["raw"]]
iter_count = len(raw_data)
failed_iter_count = 0

View File

@ -569,9 +569,10 @@ class Workload(object):
def __getitem__(self, key):
return self.workload[key]
def add_workload_data(self, workload_data):
def add_workload_data(self, chunk_order, workload_data):
db.workload_data_create(self.workload["task_uuid"],
self.workload["uuid"], workload_data)
self.workload["uuid"], chunk_order,
workload_data)
def set_results(self, data):
db.workload_set_results(self.workload["uuid"], data)

View File

@ -34,13 +34,15 @@ from rally.plugins.openstack.scenarios.vm import utils as vm_utils
from rally.plugins.openstack.scenarios.watcher import utils as watcher_utils
from rally.plugins.openstack.verification.tempest import config as tempest_conf
from rally.plugins.openstack.wrappers import glance as glance_utils
from rally.task import engine
def list_opts():
return [
("DEFAULT",
itertools.chain(logging.DEBUG_OPTS,
osclients.OSCLIENTS_OPTS)),
osclients.OSCLIENTS_OPTS,
engine.TASK_ENGINE_OPTS)),
("benchmark",
itertools.chain(cinder_utils.CINDER_BENCHMARK_OPTS,
ec2_utils.EC2_BENCHMARK_OPTS,

View File

@ -20,6 +20,7 @@ import time
import traceback
import jsonschema
from oslo_config import cfg
import six
from rally.common.i18n import _
@ -40,6 +41,14 @@ from rally.task import sla
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
TASK_ENGINE_OPTS = [
cfg.IntOpt("raw_result_chunk_size", default=1000, min=1,
help="Size of raw result chunk in iterations"),
]
CONF.register_opts(TASK_ENGINE_OPTS)
class ResultConsumer(object):
"""ResultConsumer class stores results from ScenarioRunner, checks SLA.
@ -69,6 +78,7 @@ class ResultConsumer(object):
self.runner = runner
self.load_started_at = float("inf")
self.load_finished_at = 0
self.workload_data_count = 0
self.sla_checker = sla.SLAChecker(key["kw"])
self.hook_executor = hook.HookExecutor(key["kw"], self.task)
@ -109,6 +119,17 @@ class ResultConsumer(object):
self.task.update_status(
consts.TaskStatus.SOFT_ABORTING)
task_aborted = True
# save results chunks
chunk_size = CONF.raw_result_chunk_size
while len(self.results) >= chunk_size:
results_chunk = self.results[:chunk_size]
self.results = self.results[chunk_size:]
results_chunk.sort(key=lambda x: x["timestamp"])
self.workload.add_workload_data(self.workload_data_count,
{"raw": results_chunk})
self.workload_data_count += 1
elif self.is_done.isSet():
break
else:
@ -136,9 +157,6 @@ class ResultConsumer(object):
self.task["uuid"]) == consts.TaskStatus.ABORTED:
self.sla_checker.set_aborted_manually()
# NOTE(boris-42): Sort in order of starting instead of order of ending
self.results.sort(key=lambda x: x["timestamp"])
load_duration = max(self.load_finished_at - self.load_started_at, 0)
LOG.info("Load duration is: %s" % utils.format_float_to_str(
@ -153,12 +171,17 @@ class ResultConsumer(object):
"full_duration": self.finish - self.start,
"sla": self.sla_checker.results(),
}
self.runner.send_event(type="load_finished", value=results)
if "hooks" in self.key["kw"]:
self.event_thread.join()
results["hooks"] = self.hook_executor.results()
self.workload.add_workload_data({"raw": self.results})
if self.results:
# NOTE(boris-42): Sort in order of starting
# instead of order of ending
self.results.sort(key=lambda x: x["timestamp"])
self.workload.add_workload_data(self.workload_data_count,
{"raw": self.results})
self.workload.set_results(results)
@staticmethod

View File

@ -254,7 +254,7 @@ class TasksTestCase(test.DBTestCase):
}
subtask = db.subtask_create(task_id, title="foo")
workload = db.workload_create(task_id, subtask["uuid"], key)
db.workload_data_create(task_id, workload["uuid"], {"raw": []})
db.workload_data_create(task_id, workload["uuid"], 0, {"raw": []})
db.workload_set_results(workload["uuid"], data)
res = db.task_result_get_all_by_uuid(task_id)
@ -311,7 +311,7 @@ class TasksTestCase(test.DBTestCase):
data["sla"][0] = {"success": True}
subtask = db.subtask_create(task_id, title="foo")
workload = db.workload_create(task_id, subtask["uuid"], key)
db.workload_data_create(task_id, workload["uuid"], {"raw": []})
db.workload_data_create(task_id, workload["uuid"], 0, {"raw": []})
db.workload_set_results(workload["uuid"], data)
for task_id in (task1, task2):
@ -355,7 +355,8 @@ class TasksTestCase(test.DBTestCase):
subtask = db.subtask_create(task1["uuid"], title="foo")
workload = db.workload_create(task1["uuid"], subtask["uuid"], key)
db.workload_data_create(task1["uuid"], workload["uuid"], {"raw": []})
db.workload_data_create(
task1["uuid"], workload["uuid"], 0, {"raw": []})
db.workload_set_results(workload["uuid"], data)
task1_full = db.task_get_detailed(task1["uuid"])
@ -403,7 +404,8 @@ class TasksTestCase(test.DBTestCase):
subtask = db.subtask_create(task1["uuid"], title="foo")
workload = db.workload_create(task1["uuid"], subtask["uuid"], key)
db.workload_data_create(task1["uuid"], workload["uuid"], {"raw": []})
db.workload_data_create(
task1["uuid"], workload["uuid"], 0, {"raw": []})
db.workload_set_results(workload["uuid"], data)
task1_full = db.task_get_detailed_last()
@ -463,7 +465,7 @@ class TasksTestCase(test.DBTestCase):
subtask = db.subtask_create(task_id, title="foo")
workload = db.workload_create(task_id, subtask["uuid"], key)
db.workload_data_create(task_id, workload["uuid"], raw_data)
db.workload_data_create(task_id, workload["uuid"], 0, raw_data)
db.workload_set_results(workload["uuid"], data)
res = db.task_result_get_all_by_uuid(task_id)
@ -471,6 +473,80 @@ class TasksTestCase(test.DBTestCase):
self.assertEqual(raw_data["raw"], res[0]["data"]["raw"])
self.assertEqual(key, res[0]["key"])
def test_task_multiple_raw_result_create(self):
task_id = self._create_task()["uuid"]
key = {
"name": "atata",
"pos": 0,
"kw": {
"args": {"a": "A"},
"context": {"c": "C"},
"sla": {"s": "S"},
"runner": {"r": "R", "type": "T"},
"hooks": [],
}
}
subtask = db.subtask_create(task_id, title="foo")
workload = db.workload_create(task_id, subtask["uuid"], key)
db.workload_data_create(task_id, workload["uuid"], 0, {
"raw": [
{"error": "anError", "timestamp": 10, "duration": 1},
{"duration": 1, "timestamp": 10, "duration": 1},
{"duration": 2, "timestamp": 10, "duration": 1},
{"duration": 3, "timestamp": 10, "duration": 1},
],
})
db.workload_data_create(task_id, workload["uuid"], 1, {
"raw": [
{"error": "anError2", "timestamp": 10, "duration": 1},
{"duration": 6, "timestamp": 10, "duration": 1},
{"duration": 5, "timestamp": 10, "duration": 1},
{"duration": 4, "timestamp": 10, "duration": 1},
],
})
db.workload_data_create(task_id, workload["uuid"], 2, {
"raw": [
{"duration": 7, "timestamp": 10, "duration": 1},
{"duration": 8, "timestamp": 10, "duration": 1},
],
})
db.workload_set_results(workload["uuid"], {
"sla": [{"success": True}],
"load_duration": 13,
"full_duration": 42
})
res = db.task_result_get_all_by_uuid(task_id)
self.assertEqual(len(res), 1)
self.assertEqual(res[0]["key"], key)
self.assertEqual(res[0]["data"], {
"raw": [
{"error": "anError", "timestamp": 10, "duration": 1},
{"duration": 1, "timestamp": 10, "duration": 1},
{"duration": 2, "timestamp": 10, "duration": 1},
{"duration": 3, "timestamp": 10, "duration": 1},
{"error": "anError2", "timestamp": 10, "duration": 1},
{"duration": 6, "timestamp": 10, "duration": 1},
{"duration": 5, "timestamp": 10, "duration": 1},
{"duration": 4, "timestamp": 10, "duration": 1},
{"duration": 7, "timestamp": 10, "duration": 1},
{"duration": 8, "timestamp": 10, "duration": 1},
],
"sla": [{"success": True}],
"hooks": [],
"load_duration": 13,
"full_duration": 42
})
db.task_delete(task_id)
res = db.task_result_get_all_by_uuid(task_id)
self.assertEqual(len(res), 0)
class SubtaskTestCase(test.DBTestCase):
def setUp(self):
@ -544,7 +620,7 @@ class WorkloadTestCase(test.DBTestCase):
}
workload = db.workload_create(self.task_uuid, self.subtask_uuid, key)
db.workload_data_create(self.task_uuid, workload["uuid"], raw_data)
db.workload_data_create(self.task_uuid, workload["uuid"], 0, raw_data)
workload = db.workload_set_results(workload["uuid"], data)
self.assertEqual("atata", workload["name"])
self.assertEqual(0, workload["position"])
@ -576,7 +652,6 @@ class WorkloadTestCase(test.DBTestCase):
"runner": {"r": "R", "type": "T"}
}
}
raw_data = {"raw": []}
data = {
"sla": [
{"s": "S", "success": False},
@ -588,7 +663,6 @@ class WorkloadTestCase(test.DBTestCase):
}
workload = db.workload_create(self.task_uuid, self.subtask_uuid, key)
db.workload_data_create(self.task_uuid, workload["uuid"], raw_data)
workload = db.workload_set_results(workload["uuid"], data)
self.assertEqual("atata", workload["name"])
self.assertEqual(0, workload["position"])
@ -633,7 +707,7 @@ class WorkloadDataTestCase(test.DBTestCase):
]
}
workload_data = db.workload_data_create(self.task_uuid,
self.workload_uuid, data)
self.workload_uuid, 0, data)
self.assertEqual(3, workload_data["iteration_count"])
self.assertEqual(1, workload_data["failed_iteration_count"])
self.assertEqual(dt.datetime.fromtimestamp(1),
@ -649,7 +723,7 @@ class WorkloadDataTestCase(test.DBTestCase):
mock_time.return_value = 10
data = {"raw": []}
workload_data = db.workload_data_create(self.task_uuid,
self.workload_uuid, data)
self.workload_uuid, 0, data)
self.assertEqual(0, workload_data["iteration_count"])
self.assertEqual(0, workload_data["failed_iteration_count"])
self.assertEqual(dt.datetime.fromtimestamp(10),

View File

@ -385,9 +385,10 @@ class WorkloadTestCase(test.TestCase):
mock_workload_create.return_value = self.workload
workload = objects.Workload("uuid1", "uuid2", {"bar": "baz"})
workload = workload.add_workload_data({"data": "foo"})
workload = workload.add_workload_data(0, {"data": "foo"})
mock_workload_data_create.assert_called_once_with(
self.workload["task_uuid"], self.workload["uuid"], {"data": "foo"})
self.workload["task_uuid"], self.workload["uuid"],
0, {"data": "foo"})
@mock.patch("rally.common.objects.task.db.workload_set_results")
@mock.patch("rally.common.objects.task.db.workload_create")

View File

@ -550,7 +550,7 @@ class ResultConsumerTestCase(test.TestCase):
key, task, subtask, workload, runner, False):
pass
workload.add_workload_data.assert_called_once_with({"raw": []})
self.assertFalse(workload.add_workload_data.called)
workload.set_results.assert_called_once_with({
"full_duration": 1,
"sla": mock_sla_results,
@ -671,6 +671,60 @@ class ResultConsumerTestCase(test.TestCase):
mock_sla_instance.set_unexpected_failure.assert_has_calls(
[mock.call(exc)])
@mock.patch("rally.task.engine.CONF")
@mock.patch("rally.common.objects.Task.get_status")
@mock.patch("rally.task.engine.ResultConsumer.wait_and_abort")
@mock.patch("rally.task.sla.SLAChecker")
def test_consume_results_chunked(
self, mock_sla_checker, mock_result_consumer_wait_and_abort,
mock_task_get_status, mock_conf):
mock_conf.raw_result_chunk_size = 2
mock_sla_instance = mock.MagicMock()
mock_sla_checker.return_value = mock_sla_instance
mock_task_get_status.return_value = consts.TaskStatus.RUNNING
key = {"kw": {"fake": 2}, "name": "fake", "pos": 0}
task = mock.MagicMock(spec=objects.Task)
subtask = mock.Mock(spec=objects.Subtask)
workload = mock.Mock(spec=objects.Workload)
runner = mock.MagicMock()
results = [
[{"duration": 1, "timestamp": 3},
{"duration": 2, "timestamp": 2},
{"duration": 3, "timestamp": 3}],
[{"duration": 4, "timestamp": 2},
{"duration": 5, "timestamp": 3}],
[{"duration": 6, "timestamp": 2}],
[{"duration": 7, "timestamp": 1}],
]
runner.result_queue = collections.deque(results)
runner.event_queue = collections.deque()
with engine.ResultConsumer(
key, task, subtask, workload, runner, False) as consumer_obj:
pass
mock_sla_instance.add_iteration.assert_has_calls([
mock.call({"duration": 1, "timestamp": 3}),
mock.call({"duration": 2, "timestamp": 2}),
mock.call({"duration": 3, "timestamp": 3}),
mock.call({"duration": 4, "timestamp": 2}),
mock.call({"duration": 5, "timestamp": 3}),
mock.call({"duration": 6, "timestamp": 2}),
mock.call({"duration": 7, "timestamp": 1})])
self.assertEqual([{"duration": 7, "timestamp": 1}],
consumer_obj.results)
workload.add_workload_data.assert_has_calls([
mock.call(0, {"raw": [{"duration": 2, "timestamp": 2},
{"duration": 1, "timestamp": 3}]}),
mock.call(1, {"raw": [{"duration": 4, "timestamp": 2},
{"duration": 3, "timestamp": 3}]}),
mock.call(2, {"raw": [{"duration": 6, "timestamp": 2},
{"duration": 5, "timestamp": 3}]}),
mock.call(3, {"raw": [{"duration": 7, "timestamp": 1}]})])
@mock.patch("rally.task.engine.LOG")
@mock.patch("rally.task.hook.HookExecutor")
@mock.patch("rally.task.engine.time.time")
@ -719,7 +773,7 @@ class ResultConsumerTestCase(test.TestCase):
mock.call(event_type="iteration", value=3)
])
workload.add_workload_data.assert_called_once_with({"raw": []})
self.assertFalse(workload.add_workload_data.called)
workload.set_results.assert_called_once_with({
"full_duration": 1,
"sla": mock_sla_results,