Merge "have realtime engine only search recent indexes"

This commit is contained in:
Jenkins 2014-06-13 16:17:58 +00:00 committed by Gerrit Code Review
commit 99a592fa50
3 changed files with 27 additions and 7 deletions

View File

@ -196,7 +196,10 @@ class RecheckWatch(threading.Thread):
for job in event.failed_jobs: for job in event.failed_jobs:
job.bugs = set(classifier.classify( job.bugs = set(classifier.classify(
event.change, event.rev, job.build_short_uuid)) event.change,
event.rev,
job.build_short_uuid,
recent=True))
if not event.get_all_bugs(): if not event.get_all_bugs():
self._read(event) self._read(event)
else: else:

View File

@ -215,7 +215,7 @@ class Stream(object):
def _job_console_uploaded(self, change, patch, name, build_short_uuid): def _job_console_uploaded(self, change, patch, name, build_short_uuid):
query = qb.result_ready(change, patch, name, build_short_uuid) query = qb.result_ready(change, patch, name, build_short_uuid)
r = self.es.search(query, size='10') r = self.es.search(query, size='10', recent=True)
if len(r) == 0: if len(r) == 0:
msg = ("Console logs not ready for %s %s,%s,%s" % msg = ("Console logs not ready for %s %s,%s,%s" %
(name, change, patch, build_short_uuid)) (name, change, patch, build_short_uuid))
@ -226,7 +226,7 @@ class Stream(object):
def _has_required_files(self, change, patch, name, build_short_uuid): def _has_required_files(self, change, patch, name, build_short_uuid):
query = qb.files_ready(change, patch, name, build_short_uuid) query = qb.files_ready(change, patch, name, build_short_uuid)
r = self.es.search(query, size='80') r = self.es.search(query, size='80', recent=True)
files = [x['term'] for x in r.terms] files = [x['term'] for x in r.terms]
required = required_files(name) required = required_files(name)
missing_files = [x for x in required if x not in files] missing_files = [x for x in required if x not in files]
@ -371,7 +371,8 @@ class Classifier():
es_query = qb.generic(query, facet=facet) es_query = qb.generic(query, facet=facet)
return self.es.search(es_query, size=size) return self.es.search(es_query, size=size)
def classify(self, change_number, patch_number, build_short_uuid): def classify(self, change_number, patch_number,
build_short_uuid, recent=False):
"""Returns either empty list or list with matched bugs.""" """Returns either empty list or list with matched bugs."""
self.log.debug("Entering classify") self.log.debug("Entering classify")
#Reload each time #Reload each time
@ -383,7 +384,7 @@ class Classifier():
% x['bug']) % x['bug'])
query = qb.single_patch(x['query'], change_number, patch_number, query = qb.single_patch(x['query'], change_number, patch_number,
build_short_uuid) build_short_uuid)
results = self.es.search(query, size='10') results = self.es.search(query, size='10', recent=recent)
if len(results) > 0: if len(results) > 0:
bug_matches.append(x['bug']) bug_matches.append(x['bug'])
return bug_matches return bug_matches

View File

@ -32,7 +32,7 @@ class SearchEngine(object):
def __init__(self, url): def __init__(self, url):
self._url = url self._url = url
def search(self, query, size=1000): def search(self, query, size=1000, recent=False):
"""Search an elasticsearch server. """Search an elasticsearch server.
`query` parameter is the complicated query structure that `query` parameter is the complicated query structure that
@ -43,10 +43,26 @@ class SearchEngine(object):
For certain classes of queries (like faceted ones), this can actually For certain classes of queries (like faceted ones), this can actually
be set very low, as it won't impact the facet counts. be set very low, as it won't impact the facet counts.
`recent` search only most recent indexe(s), assuming this is basically
a real time query that you only care about the last hour of time.
Using recent dramatically reduces the load on the ES cluster.
The returned result is a ResultSet query. The returned result is a ResultSet query.
""" """
es = pyelasticsearch.ElasticSearch(self._url) es = pyelasticsearch.ElasticSearch(self._url)
results = es.search(query, size=size) args = {'size': size}
if recent:
# today's index
datefmt = 'logstash-%Y.%m.%d'
now = datetime.datetime.utcnow()
lasthr = now - datetime.timedelta(hours=1)
indexes = [now.strftime(datefmt)]
if (lasthr.strftime(datefmt) != now.strftime(datefmt)):
indexes.append(lasthr.strftime(datefmt))
args['index'] = indexes
results = es.search(query, **args)
return ResultSet(results) return ResultSet(results)