expose on channel when we timeout on logs

we have been timing out on logs a lot, and not noticing. Redo this
logic to be exception based so we can tell the IRC channel when we
timeout on logs, to get to the bottom of reliability issues with
indexing logstash data.

Change-Id: Ia63d801235c6959eb7b97c334291a6d2f06411b6
This commit is contained in:
Sean Dague 2014-01-14 08:31:18 -05:00
parent e75b996e60
commit 359525be40
2 changed files with 46 additions and 22 deletions

View File

@ -125,9 +125,15 @@ class RecheckWatch(threading.Thread):
LOG.info('Compiled Message %s: %s' % (channel, msg)) LOG.info('Compiled Message %s: %s' % (channel, msg))
self.ircbot.send(channel, msg) self.ircbot.send(channel, msg)
def _read(self, data): def print_msg(self, channel, msg):
self.ircbot.send(channel, msg)
def _read(self, data={}, msg=""):
for channel in self.channel_config.channels: for channel in self.channel_config.channels:
if data.get('bug_numbers'): if msg:
if channel in self.channel_config.events['negative']:
self.print_msg(channel, msg)
elif data.get('bug_numbers'):
if channel in self.channel_config.events['positive']: if channel in self.channel_config.events['positive']:
self.error_found(channel, data) self.error_found(channel, data)
else: else:
@ -140,20 +146,24 @@ class RecheckWatch(threading.Thread):
classifier = er.Classifier(self.queries) classifier = er.Classifier(self.queries)
stream = er.Stream(self.username, self.host, self.key) stream = er.Stream(self.username, self.host, self.key)
while True: while True:
event = stream.get_failed_tempest()
change = event['change']['number']
rev = event['patchSet']['number']
change_id = "%s,%s" % (change, rev)
project = event['change']['project']
try: try:
event = stream.get_failed_tempest()
change = event['change']['number']
rev = event['patchSet']['number']
change_id = "%s,%s" % (change, rev)
project = event['change']['project']
bug_numbers = classifier.classify(change, rev) bug_numbers = classifier.classify(change, rev)
if not bug_numbers: if not bug_numbers:
self._read(event) self._read(event)
else: else:
event['bug_numbers'] = bug_numbers event['bug_numbers'] = bug_numbers
self._read(event) self._read(data=event)
if self.commenting: if self.commenting:
stream.leave_comment(project, change_id, bug_numbers) stream.leave_comment(project, change_id, bug_numbers)
except er.ResultTimedOut as e:
LOG.warn(e.msg)
self._read(msg=e.msg)
except Exception: except Exception:
LOG.exception("Uncaught exception processing event.") LOG.exception("Uncaught exception processing event.")

View File

@ -48,8 +48,19 @@ def required_files(job):
return files return files
class ResultNotReady(Exception): class ConsoleNotReady(Exception):
pass def __init__(self, msg):
self.msg = msg
class FilesNotReady(Exception):
def __init__(self, msg):
self.msg = msg
class ResultTimedOut(Exception):
def __init__(self, msg):
self.msg = msg
class Stream(object): class Stream(object):
@ -95,7 +106,9 @@ class Stream(object):
query = qb.result_ready(change, patch, name) query = qb.result_ready(change, patch, name)
r = self.es.search(query, size='10') r = self.es.search(query, size='10')
if len(r) == 0: if len(r) == 0:
raise ResultNotReady() msg = ("Console logs not ready for %s %s,%s" %
(name, change, patch))
raise ConsoleNotReady(msg)
else: else:
LOG.debug("Console ready for %s %s,%s" % LOG.debug("Console ready for %s %s,%s" %
(name, change, patch)) (name, change, patch))
@ -107,7 +120,9 @@ class Stream(object):
required = required_files(name) required = required_files(name)
missing_files = [x for x in required if x not in files] missing_files = [x for x in required if x not in files]
if len(missing_files) != 0: if len(missing_files) != 0:
raise ResultNotReady() msg = ("%s missing for %s %s,%s" % (
change, patch, name, missing_files))
raise FilesNotReady(msg)
def _is_openstack_project(self, event): def _is_openstack_project(self, event):
return "tempest-dsvm-full" in event["comment"] return "tempest-dsvm-full" in event["comment"]
@ -126,9 +141,8 @@ class Stream(object):
change_number, patch_number, job_name) change_number, patch_number, job_name)
break break
except ResultNotReady: except ConsoleNotReady as e:
LOG.debug("Console logs not ready for %s %s,%s" % LOG.debug(e.msg)
(job_name, change_number, patch_number))
time.sleep(SLEEP_TIME) time.sleep(SLEEP_TIME)
continue continue
except pyelasticsearch.exceptions.InvalidJsonResponseError: except pyelasticsearch.exceptions.InvalidJsonResponseError:
@ -142,9 +156,9 @@ class Stream(object):
if i == NUMBER_OF_RETRIES - 1: if i == NUMBER_OF_RETRIES - 1:
elapsed = datetime.datetime.now() - started_at elapsed = datetime.datetime.now() - started_at
LOG.warn("Console logs not available after %ss for %s %s,%s" % msg = ("Console logs not available after %ss for %s %s,%s" %
(elapsed, job_name, change_number, patch_number)) (elapsed, job_name, change_number, patch_number))
return False raise ResultTimedOut(msg)
LOG.debug( LOG.debug(
"Found hits for change_number: %s, patch_number: %s" "Found hits for change_number: %s, patch_number: %s"
@ -160,14 +174,14 @@ class Stream(object):
% (change_number, patch_number)) % (change_number, patch_number))
time.sleep(10) time.sleep(10)
return True return True
except ResultNotReady: except FilesNotReady:
time.sleep(SLEEP_TIME) time.sleep(SLEEP_TIME)
# if we get to the end, we're broken # if we get to the end, we're broken
elapsed = datetime.datetime.now() - started_at elapsed = datetime.datetime.now() - started_at
LOG.warn("Required files not ready after %ss for %s %d,%d" % msg = ("Required files not ready after %ss for %s %d,%d" %
(elapsed, job_name, change_number, patch_number)) (elapsed, job_name, change_number, patch_number))
return False raise ResultTimedOut(msg)
def get_failed_tempest(self): def get_failed_tempest(self):
LOG.debug("entering get_failed_tempest") LOG.debug("entering get_failed_tempest")