f4992d0421
Our elasticSearch cluster previously used '@message', but we have since moved over to using just 'message'. The rest of the uses of '@message' were removed in I6fb0aa87a291660df879282e9a7851bbb27e9ac2 Change-Id: I2b5d0f176deddb1b1ab9e831395c3216e927d8bf
189 lines
6.6 KiB
Python
189 lines
6.6 KiB
Python
# Copyright Samsung Electronics 2013. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Elastic search wrapper to make handling results easier."""
|
|
|
|
import copy
|
|
import datetime
|
|
import pprint
|
|
import time
|
|
|
|
import pyelasticsearch
|
|
|
|
|
|
pp = pprint.PrettyPrinter()
|
|
|
|
|
|
class SearchEngine(object):
|
|
"""Wrapper for pyelasticsearch so that it returns result sets."""
|
|
def __init__(self, url):
|
|
self._url = url
|
|
|
|
def search(self, query, size=1000):
|
|
"""Search an elasticsearch server.
|
|
|
|
`query` parameter is the complicated query structure that
|
|
pyelasticsearch uses. More details in their documentation.
|
|
|
|
`size` is the max number of results to return from the search
|
|
engine. We default it to 1000 to ensure we don't loose things.
|
|
For certain classes of queries (like faceted ones), this can actually
|
|
be set very low, as it won't impact the facet counts.
|
|
|
|
The returned result is a ResultSet query.
|
|
"""
|
|
es = pyelasticsearch.ElasticSearch(self._url)
|
|
results = es.search(query, size=size)
|
|
return ResultSet(results)
|
|
|
|
|
|
class ResultSet(list):
|
|
"""An easy iterator object for handling elasticsearch results.
|
|
|
|
pyelasticsearch returns very complex result structures, and manipulating
|
|
them directly is both ugly and error prone. The point of this wrapper class
|
|
is to give us a container that makes working with pyes results more
|
|
natural.
|
|
|
|
For instance:
|
|
results = se.search(...)
|
|
for hit in results:
|
|
print hit.build_status
|
|
|
|
This greatly simplifies code that is interacting with search results, and
|
|
allows us to handle some schema instability with elasticsearch, through
|
|
adapting our __getattr__ methods.
|
|
|
|
Design goals for ResultSet are that it is an iterator, and that all the
|
|
data that we want to work with is mapped to a flat attribute namespace
|
|
(pyes goes way overboard with nesting, which is fine in the general
|
|
case, but in the elastic_recheck case is just added complexity).
|
|
"""
|
|
def __init__(self, results={}):
|
|
self._results = results
|
|
if 'hits' in results:
|
|
self._parse_hits(results['hits'])
|
|
|
|
def _parse_hits(self, hits):
|
|
# why, oh why elastic search
|
|
hits = hits['hits']
|
|
for hit in hits:
|
|
list.append(self, Hit(hit))
|
|
|
|
def __getattr__(self, attr):
|
|
"""Magic __getattr__, flattens the attributes namespace.
|
|
|
|
First search to see if a facet attribute exists by this name,
|
|
secondly look at the top level attributes to return.
|
|
"""
|
|
if 'facets' in self._results:
|
|
if attr in self._results['facets']['tag']:
|
|
return self._results['facets']['tag'][attr]
|
|
if attr in self._results:
|
|
return self._results[attr]
|
|
|
|
|
|
class FacetSet(dict):
|
|
"""A dictionary like collection for creating faceted ResultSets.
|
|
|
|
Elastic Search doesn't support nested facets, which are incredibly
|
|
useful for things like faceting by build_status then by build_uuid.
|
|
This is a client side implementation that processes a ResultSet
|
|
with an ordered list of facets, and turns it into a data structure
|
|
which is FacetSet -> FacetSet ... -> ResultSet (arbitrary nesting
|
|
of FaceSets with ResultSet as the leaves.
|
|
|
|
Treat this basically like a dictionary (which it inherits from).
|
|
"""
|
|
def _histogram(self, data, facet, res=3600):
|
|
"""A preprocessor for data should we want to bucket it."""
|
|
if facet == "timestamp":
|
|
ts = datetime.datetime.strptime(data, "%Y-%m-%dT%H:%M:%S.%fZ")
|
|
tsepoch = int(time.mktime(ts.timetuple()))
|
|
# take the floor based on resolution
|
|
ts -= datetime.timedelta(
|
|
seconds=(tsepoch % res),
|
|
microseconds=ts.microsecond)
|
|
# ms since epoch
|
|
epoch = datetime.datetime.utcfromtimestamp(0)
|
|
pos = int(((ts - epoch).total_seconds()) * 1000)
|
|
return pos
|
|
else:
|
|
return data
|
|
|
|
def detect_facets(self, results, facets, res=3600):
|
|
if len(facets) > 0:
|
|
facet = facets.pop(0)
|
|
for hit in results:
|
|
attr = self._histogram(hit[facet], facet)
|
|
if attr not in self:
|
|
dict.setdefault(self, attr, ResultSet())
|
|
self[attr].append(hit)
|
|
else:
|
|
self[attr].append(hit)
|
|
|
|
# if we still have more facets to go, recurse down
|
|
if len(facets) > 0:
|
|
newkeys = {}
|
|
for key in self:
|
|
fs = FacetSet()
|
|
fs.detect_facets(self[key], copy.deepcopy(facets), res=res)
|
|
newkeys[key] = fs
|
|
self.update(newkeys)
|
|
|
|
|
|
class Hit(object):
|
|
def __init__(self, hit):
|
|
self._hit = hit
|
|
|
|
def index(self):
|
|
return self._hit['_index']
|
|
|
|
def __getitem__(self, key):
|
|
return self.__getattr__(key)
|
|
|
|
def __getattr__(self, attr):
|
|
"""flatten out our attr space into a few key types
|
|
|
|
new style ES has
|
|
_source[attr] for a flat space
|
|
old style ES has
|
|
_source['@attr'] for things like message, @timestamp
|
|
and
|
|
_source['@fields'][attr] for things like build_name, build_status
|
|
|
|
also, always collapse down all attributes to singletons, because
|
|
they might be lists if we use multiline processing (which we do
|
|
a lot). In the general case this could be a problem, but the way
|
|
we use logstash, there is only ever one element in these lists.
|
|
"""
|
|
def first(item):
|
|
if type(item) == list:
|
|
return item[0]
|
|
return item
|
|
|
|
result = None
|
|
at_attr = "@%s" % attr
|
|
if attr in self._hit['_source']:
|
|
result = first(self._hit['_source'][attr])
|
|
elif at_attr in self._hit['_source']:
|
|
result = first(self._hit['_source'][at_attr])
|
|
elif attr in self._hit['_source']['@fields']:
|
|
result = first(self._hit['_source']['@fields'][attr])
|
|
|
|
return result
|
|
|
|
def __repr__(self):
|
|
return pp.pformat(self._hit)
|