From b4591df9e9af8052676fa2ad1bcbf0362ebe176f Mon Sep 17 00:00:00 2001 From: Sean Dague Date: Thu, 12 Jun 2014 17:47:41 -0400 Subject: [PATCH] have realtime engine only search recent indexes Elastic Recheck is really 2 things, real time searching, and bulk offline categorization. While the bulk categorization needs to look over the entire dataset, the real time portion is really deadline oriented. So only cares about the last hour's worth of data. As such we really don't need to search *all* the indexes in ES, but only the most recent one (and possibly the one before that if we are near rotation). Implement this via a recent= parameter for our search feature. If set to true then we specify the most recently logstash index. If it turns out that we're within an hour of rotation, also search the one before that. Adjust all the queries the bot uses to be recent=True. This will hopefully reduce the load generated by the bot on the ES cluster. Change-Id: I0dfc295dd9b381acb67f192174edd6fdde06f24c --- elastic_recheck/bot.py | 5 ++++- elastic_recheck/elasticRecheck.py | 9 +++++---- elastic_recheck/results.py | 20 ++++++++++++++++++-- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/elastic_recheck/bot.py b/elastic_recheck/bot.py index f531f5db..2c967662 100755 --- a/elastic_recheck/bot.py +++ b/elastic_recheck/bot.py @@ -196,7 +196,10 @@ class RecheckWatch(threading.Thread): for job in event.failed_jobs: job.bugs = set(classifier.classify( - event.change, event.rev, job.build_short_uuid)) + event.change, + event.rev, + job.build_short_uuid, + recent=True)) if not event.get_all_bugs(): self._read(event) else: diff --git a/elastic_recheck/elasticRecheck.py b/elastic_recheck/elasticRecheck.py index 4d36fb6f..2b05a6c1 100644 --- a/elastic_recheck/elasticRecheck.py +++ b/elastic_recheck/elasticRecheck.py @@ -212,7 +212,7 @@ class Stream(object): def _job_console_uploaded(self, change, patch, name, build_short_uuid): query = qb.result_ready(change, patch, name, build_short_uuid) - r = self.es.search(query, size='10') + r = self.es.search(query, size='10', recent=True) if len(r) == 0: msg = ("Console logs not ready for %s %s,%s,%s" % (name, change, patch, build_short_uuid)) @@ -223,7 +223,7 @@ class Stream(object): def _has_required_files(self, change, patch, name, build_short_uuid): query = qb.files_ready(change, patch, name, build_short_uuid) - r = self.es.search(query, size='80') + r = self.es.search(query, size='80', recent=True) files = [x['term'] for x in r.terms] required = required_files(name) missing_files = [x for x in required if x not in files] @@ -368,7 +368,8 @@ class Classifier(): es_query = qb.generic(query, facet=facet) return self.es.search(es_query, size=size) - def classify(self, change_number, patch_number, build_short_uuid): + def classify(self, change_number, patch_number, + build_short_uuid, recent=False): """Returns either empty list or list with matched bugs.""" self.log.debug("Entering classify") #Reload each time @@ -380,7 +381,7 @@ class Classifier(): % x['bug']) query = qb.single_patch(x['query'], change_number, patch_number, build_short_uuid) - results = self.es.search(query, size='10') + results = self.es.search(query, size='10', recent=recent) if len(results) > 0: bug_matches.append(x['bug']) return bug_matches diff --git a/elastic_recheck/results.py b/elastic_recheck/results.py index 17576c73..e17c85c2 100644 --- a/elastic_recheck/results.py +++ b/elastic_recheck/results.py @@ -32,7 +32,7 @@ class SearchEngine(object): def __init__(self, url): self._url = url - def search(self, query, size=1000): + def search(self, query, size=1000, recent=False): """Search an elasticsearch server. `query` parameter is the complicated query structure that @@ -43,10 +43,26 @@ class SearchEngine(object): For certain classes of queries (like faceted ones), this can actually be set very low, as it won't impact the facet counts. + `recent` search only most recent indexe(s), assuming this is basically + a real time query that you only care about the last hour of time. + Using recent dramatically reduces the load on the ES cluster. + The returned result is a ResultSet query. + """ es = pyelasticsearch.ElasticSearch(self._url) - results = es.search(query, size=size) + args = {'size': size} + if recent: + # today's index + datefmt = 'logstash-%Y.%m.%d' + now = datetime.datetime.utcnow() + lasthr = now - datetime.timedelta(hours=1) + indexes = [now.strftime(datefmt)] + if (lasthr.strftime(datefmt) != now.strftime(datefmt)): + indexes.append(lasthr.strftime(datefmt)) + args['index'] = indexes + + results = es.search(query, **args) return ResultSet(results)