have realtime engine only search recent indexes
Elastic Recheck is really 2 things, real time searching, and bulk offline categorization. While the bulk categorization needs to look over the entire dataset, the real time portion is really deadline oriented. So only cares about the last hour's worth of data. As such we really don't need to search *all* the indexes in ES, but only the most recent one (and possibly the one before that if we are near rotation). Implement this via a recent= parameter for our search feature. If set to true then we specify the most recently logstash index. If it turns out that we're within an hour of rotation, also search the one before that. Adjust all the queries the bot uses to be recent=True. This will hopefully reduce the load generated by the bot on the ES cluster. Change-Id: I0dfc295dd9b381acb67f192174edd6fdde06f24c
This commit is contained in:
parent
744d28dfa9
commit
b4591df9e9
@ -196,7 +196,10 @@ class RecheckWatch(threading.Thread):
|
||||
|
||||
for job in event.failed_jobs:
|
||||
job.bugs = set(classifier.classify(
|
||||
event.change, event.rev, job.build_short_uuid))
|
||||
event.change,
|
||||
event.rev,
|
||||
job.build_short_uuid,
|
||||
recent=True))
|
||||
if not event.get_all_bugs():
|
||||
self._read(event)
|
||||
else:
|
||||
|
@ -212,7 +212,7 @@ class Stream(object):
|
||||
|
||||
def _job_console_uploaded(self, change, patch, name, build_short_uuid):
|
||||
query = qb.result_ready(change, patch, name, build_short_uuid)
|
||||
r = self.es.search(query, size='10')
|
||||
r = self.es.search(query, size='10', recent=True)
|
||||
if len(r) == 0:
|
||||
msg = ("Console logs not ready for %s %s,%s,%s" %
|
||||
(name, change, patch, build_short_uuid))
|
||||
@ -223,7 +223,7 @@ class Stream(object):
|
||||
|
||||
def _has_required_files(self, change, patch, name, build_short_uuid):
|
||||
query = qb.files_ready(change, patch, name, build_short_uuid)
|
||||
r = self.es.search(query, size='80')
|
||||
r = self.es.search(query, size='80', recent=True)
|
||||
files = [x['term'] for x in r.terms]
|
||||
required = required_files(name)
|
||||
missing_files = [x for x in required if x not in files]
|
||||
@ -368,7 +368,8 @@ class Classifier():
|
||||
es_query = qb.generic(query, facet=facet)
|
||||
return self.es.search(es_query, size=size)
|
||||
|
||||
def classify(self, change_number, patch_number, build_short_uuid):
|
||||
def classify(self, change_number, patch_number,
|
||||
build_short_uuid, recent=False):
|
||||
"""Returns either empty list or list with matched bugs."""
|
||||
self.log.debug("Entering classify")
|
||||
#Reload each time
|
||||
@ -380,7 +381,7 @@ class Classifier():
|
||||
% x['bug'])
|
||||
query = qb.single_patch(x['query'], change_number, patch_number,
|
||||
build_short_uuid)
|
||||
results = self.es.search(query, size='10')
|
||||
results = self.es.search(query, size='10', recent=recent)
|
||||
if len(results) > 0:
|
||||
bug_matches.append(x['bug'])
|
||||
return bug_matches
|
||||
|
@ -32,7 +32,7 @@ class SearchEngine(object):
|
||||
def __init__(self, url):
|
||||
self._url = url
|
||||
|
||||
def search(self, query, size=1000):
|
||||
def search(self, query, size=1000, recent=False):
|
||||
"""Search an elasticsearch server.
|
||||
|
||||
`query` parameter is the complicated query structure that
|
||||
@ -43,10 +43,26 @@ class SearchEngine(object):
|
||||
For certain classes of queries (like faceted ones), this can actually
|
||||
be set very low, as it won't impact the facet counts.
|
||||
|
||||
`recent` search only most recent indexe(s), assuming this is basically
|
||||
a real time query that you only care about the last hour of time.
|
||||
Using recent dramatically reduces the load on the ES cluster.
|
||||
|
||||
The returned result is a ResultSet query.
|
||||
|
||||
"""
|
||||
es = pyelasticsearch.ElasticSearch(self._url)
|
||||
results = es.search(query, size=size)
|
||||
args = {'size': size}
|
||||
if recent:
|
||||
# today's index
|
||||
datefmt = 'logstash-%Y.%m.%d'
|
||||
now = datetime.datetime.utcnow()
|
||||
lasthr = now - datetime.timedelta(hours=1)
|
||||
indexes = [now.strftime(datefmt)]
|
||||
if (lasthr.strftime(datefmt) != now.strftime(datefmt)):
|
||||
indexes.append(lasthr.strftime(datefmt))
|
||||
args['index'] = indexes
|
||||
|
||||
results = es.search(query, **args)
|
||||
return ResultSet(results)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user