Fix elastic-recheck query command
This code doesn't work at all. Bring it back to life. Also accept inputs from a config file. Closes-Bug: #1526921 Change-Id: I8f45dc9d42f7547f9d849686739b9a641c176814
This commit is contained in:
parent
06091e7b1c
commit
19712edc2d
@ -15,20 +15,17 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import base64
|
import ConfigParser
|
||||||
import itertools
|
import itertools
|
||||||
import json
|
import json
|
||||||
import time
|
|
||||||
|
|
||||||
import requests
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from elastic_recheck import log as logging
|
import elastic_recheck.elasticRecheck as er
|
||||||
|
import elastic_recheck.log as logging
|
||||||
|
import elastic_recheck.results as er_results
|
||||||
|
|
||||||
LOG = logging.getLogger('erquery')
|
LOG = logging.getLogger('erquery')
|
||||||
|
|
||||||
ENDPOINT = 'http://logstash.openstack.org/api'
|
|
||||||
DEFAULT_NUMBER_OF_DAYS = 10
|
DEFAULT_NUMBER_OF_DAYS = 10
|
||||||
DEFAULT_MAX_QUANTITY = 5
|
DEFAULT_MAX_QUANTITY = 5
|
||||||
IGNORED_ATTRIBUTES = [
|
IGNORED_ATTRIBUTES = [
|
||||||
@ -44,42 +41,6 @@ IGNORED_ATTRIBUTES = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def _GET(path):
|
|
||||||
r = requests.get(ENDPOINT + path)
|
|
||||||
|
|
||||||
if r.status_code != requests.codes.ok:
|
|
||||||
LOG.info('Got HTTP %s, retrying...' % r.status_code)
|
|
||||||
# retry once
|
|
||||||
r = requests.get(ENDPOINT + path)
|
|
||||||
|
|
||||||
try:
|
|
||||||
return r.json()
|
|
||||||
except Exception:
|
|
||||||
raise SystemExit(r.text)
|
|
||||||
|
|
||||||
|
|
||||||
def _encode(q):
|
|
||||||
"""Encode a JSON dict for inclusion in a URL."""
|
|
||||||
return base64.b64encode(json.dumps(q))
|
|
||||||
|
|
||||||
|
|
||||||
def _unix_time_in_microseconds():
|
|
||||||
return int(time.time() * 1000)
|
|
||||||
|
|
||||||
|
|
||||||
def search(q, days):
|
|
||||||
search = {
|
|
||||||
'search': q,
|
|
||||||
'fields': [],
|
|
||||||
'offset': 0,
|
|
||||||
'timeframe': str(days * 86400),
|
|
||||||
'graphmode': 'count',
|
|
||||||
'time': {
|
|
||||||
'user_interval': 0},
|
|
||||||
'stamp': _unix_time_in_microseconds()}
|
|
||||||
return _GET('/search/%s' % _encode(search))
|
|
||||||
|
|
||||||
|
|
||||||
def analyze_attributes(attributes):
|
def analyze_attributes(attributes):
|
||||||
analysis = {}
|
analysis = {}
|
||||||
for attribute, values in attributes.iteritems():
|
for attribute, values in attributes.iteritems():
|
||||||
@ -102,17 +63,21 @@ def analyze_attributes(attributes):
|
|||||||
return analysis
|
return analysis
|
||||||
|
|
||||||
|
|
||||||
def query(query_file_name, days=DEFAULT_NUMBER_OF_DAYS,
|
def query(query_file_name, days=DEFAULT_NUMBER_OF_DAYS, es_url=er.ES_URL,
|
||||||
quantity=DEFAULT_MAX_QUANTITY, verbose=False):
|
quantity=DEFAULT_MAX_QUANTITY, verbose=False):
|
||||||
|
|
||||||
|
es = er_results.SearchEngine(es_url)
|
||||||
|
|
||||||
with open(query_file_name) as f:
|
with open(query_file_name) as f:
|
||||||
query_file = yaml.load(f.read())
|
query_file = yaml.load(f.read())
|
||||||
query = query_file['query']
|
query = query_file['query']
|
||||||
|
|
||||||
r = search(q=query, days=days)
|
r = es.search(query, days=days)
|
||||||
print('total hits: %s' % r['hits']['total'])
|
print('total hits: %s' % r.hits['total'])
|
||||||
|
|
||||||
attributes = {}
|
attributes = {}
|
||||||
for hit in r['hits']['hits']:
|
|
||||||
|
for hit in r.hits['hits']:
|
||||||
for key, value in hit['_source'].iteritems():
|
for key, value in hit['_source'].iteritems():
|
||||||
value_hash = json.dumps(value)
|
value_hash = json.dumps(value)
|
||||||
attributes.setdefault(key, {}).setdefault(value_hash, 0)
|
attributes.setdefault(key, {}).setdefault(value_hash, 0)
|
||||||
@ -125,7 +90,7 @@ def query(query_file_name, days=DEFAULT_NUMBER_OF_DAYS,
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
print(attribute)
|
print(attribute)
|
||||||
for percentage, value in itertools.islice(results, None, quantity):
|
for percentage, value in itertools.islice(results, quantity):
|
||||||
if isinstance(value, list):
|
if isinstance(value, list):
|
||||||
value = ' '.join(unicode(x) for x in value)
|
value = ' '.join(unicode(x) for x in value)
|
||||||
print(' %d%% %s' % (percentage, value))
|
print(' %d%% %s' % (percentage, value))
|
||||||
@ -147,9 +112,22 @@ def main():
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--verbose', '-v', action='store_true', default=False,
|
'--verbose', '-v', action='store_true', default=False,
|
||||||
help='Report on additional query metadata.')
|
help='Report on additional query metadata.')
|
||||||
|
parser.add_argument('-c', '--conf', help="Elastic Recheck Configuration "
|
||||||
|
"file to use for data_source options such as "
|
||||||
|
"elastic search url, logstash url, and database uri.")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
query(args.query_file.name, args.days, args.quantity, args.verbose)
|
# Start with defaults
|
||||||
|
es_url = er.ES_URL
|
||||||
|
|
||||||
|
if args.conf:
|
||||||
|
config = ConfigParser.ConfigParser({'es_url': er.ES_URL})
|
||||||
|
config.read(args.conf)
|
||||||
|
if config.has_section('data_source'):
|
||||||
|
es_url = config.get('data_source', 'es_url')
|
||||||
|
|
||||||
|
query(args.query_file.name, days=args.days, quantity=args.quantity,
|
||||||
|
verbose=args.verbose, es_url=es_url)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -32,7 +32,7 @@ class SearchEngine(object):
|
|||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
self._url = url
|
self._url = url
|
||||||
|
|
||||||
def search(self, query, size=1000, recent=False):
|
def search(self, query, size=1000, recent=False, days=0):
|
||||||
"""Search an elasticsearch server.
|
"""Search an elasticsearch server.
|
||||||
|
|
||||||
`query` parameter is the complicated query structure that
|
`query` parameter is the complicated query structure that
|
||||||
@ -47,19 +47,25 @@ class SearchEngine(object):
|
|||||||
a real time query that you only care about the last hour of time.
|
a real time query that you only care about the last hour of time.
|
||||||
Using recent dramatically reduces the load on the ES cluster.
|
Using recent dramatically reduces the load on the ES cluster.
|
||||||
|
|
||||||
|
`days` search only the last number of days.
|
||||||
|
|
||||||
The returned result is a ResultSet query.
|
The returned result is a ResultSet query.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
es = pyelasticsearch.ElasticSearch(self._url)
|
es = pyelasticsearch.ElasticSearch(self._url)
|
||||||
args = {'size': size}
|
args = {'size': size}
|
||||||
if recent:
|
if recent or days:
|
||||||
# today's index
|
# today's index
|
||||||
datefmt = 'logstash-%Y.%m.%d'
|
datefmt = 'logstash-%Y.%m.%d'
|
||||||
now = datetime.datetime.utcnow()
|
now = datetime.datetime.utcnow()
|
||||||
lasthr = now - datetime.timedelta(hours=1)
|
|
||||||
indexes = [now.strftime(datefmt)]
|
indexes = [now.strftime(datefmt)]
|
||||||
if (lasthr.strftime(datefmt) != now.strftime(datefmt)):
|
if recent:
|
||||||
|
lasthr = now - datetime.timedelta(hours=1)
|
||||||
|
if lasthr.strftime(datefmt) != now.strftime(datefmt):
|
||||||
indexes.append(lasthr.strftime(datefmt))
|
indexes.append(lasthr.strftime(datefmt))
|
||||||
|
for day in range(1, days):
|
||||||
|
lastday = now - datetime.timedelta(days=day)
|
||||||
|
indexes.append(lastday.strftime(datefmt))
|
||||||
args['index'] = indexes
|
args['index'] = indexes
|
||||||
|
|
||||||
results = es.search(query, **args)
|
results = es.search(query, **args)
|
||||||
|
@ -17,19 +17,10 @@ import sys
|
|||||||
import mock
|
import mock
|
||||||
|
|
||||||
from elastic_recheck.cmd import query
|
from elastic_recheck.cmd import query
|
||||||
|
from elastic_recheck.results import ResultSet
|
||||||
from elastic_recheck.tests import unit
|
from elastic_recheck.tests import unit
|
||||||
|
|
||||||
|
|
||||||
class FakeResponse(object):
|
|
||||||
def __init__(self, response_text):
|
|
||||||
super(FakeResponse, self).__init__()
|
|
||||||
self.text = response_text
|
|
||||||
self.status_code = 200
|
|
||||||
|
|
||||||
def json(self):
|
|
||||||
return json.loads(self.text)
|
|
||||||
|
|
||||||
|
|
||||||
class TestQueryCmd(unit.UnitTestCase):
|
class TestQueryCmd(unit.UnitTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestQueryCmd, self).setUp()
|
super(TestQueryCmd, self).setUp()
|
||||||
@ -43,9 +34,11 @@ class TestQueryCmd(unit.UnitTestCase):
|
|||||||
def test_query(self):
|
def test_query(self):
|
||||||
with open('elastic_recheck/tests/unit/logstash/1284371.analysis') as f:
|
with open('elastic_recheck/tests/unit/logstash/1284371.analysis') as f:
|
||||||
expected_stdout = f.read()
|
expected_stdout = f.read()
|
||||||
with mock.patch('requests.get') as mock_get:
|
with mock.patch('elastic_recheck.results.SearchEngine.search') as \
|
||||||
|
mock_search:
|
||||||
with open('elastic_recheck/tests/unit/logstash/1284371.json') as f:
|
with open('elastic_recheck/tests/unit/logstash/1284371.json') as f:
|
||||||
mock_get.return_value = FakeResponse(f.read())
|
jsonResponse = json.loads(f.read())
|
||||||
|
mock_search.return_value = ResultSet(jsonResponse)
|
||||||
query.query('elastic_recheck/tests/unit/queries/1284371.yaml')
|
query.query('elastic_recheck/tests/unit/queries/1284371.yaml')
|
||||||
sys.stdout.seek(0)
|
sys.stdout.seek(0)
|
||||||
self.assertEqual(expected_stdout, sys.stdout.read())
|
self.assertEqual(expected_stdout, sys.stdout.read())
|
||||||
|
Loading…
Reference in New Issue
Block a user