validations-libs/validations_libs/validation_logs.py
matbu 8d9e1b5dfd Move No hosts matched status to skipped instead of failed
When no hosts matched the callback and the cli return the validation
as failed state.
This patch move this status as 'skipped' and ignored instead of 'failed'.

Resolves: rhbz#2237500
Change-Id: I82bba29f802ef98e327c5421ef9a5079a3586d70
2023-09-12 17:20:53 +02:00

576 lines
19 KiB
Python

# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import json
import glob
from validations_libs.logger import getLogger
import os
import time
from os.path import join
from validations_libs import constants
LOG = getLogger(__name__ + ".validation_logs")
class ValidationLog:
"""An object for encapsulating a Validation Log file"""
def __init__(self, uuid=None, validation_id=None, logfile=None,
log_path=constants.VALIDATIONS_LOG_BASEDIR,
extension='json'):
"""Wrap the Validation Log file
:param uuid: The uuid of the validation execution
:type uuid: ``string``
:param validation_id: The ID of the validation
:type validation_id: ``string``
:param logfile: The absolute path of the log file
:type logfile: ``string`
:param log_path: The absolute path of the logs directory
:type log_path: ``string``
:param extension: The file extension (Default to 'json')
:type extension: ``string``
"""
# Set properties
self.uuid = uuid
self.validation_id = validation_id
self.abs_log_path = log_path
self.extension = extension
self.content = {}
self.name = None
self.datetime = None
# Get full path and content raise exception if it's impossible
if logfile:
if os.path.isabs(logfile):
self.abs_log_path = logfile
else:
raise ValueError(
'logfile must be absolute path, but is: {}'.format(logfile)
)
elif uuid and validation_id:
self.abs_log_path = self.get_log_path()
else:
raise Exception(
'When not using logfile argument, the uuid and '
'validation_id have to be set'
)
self.content = self._get_content()
self.name = self._get_name()
self.datetime = self._get_time()
# if we have a log file then extract uuid, validation_id and timestamp
if logfile:
try:
self.uuid, _name = self.name.split('_', 1)
self.validation_id, self.datetime = _name.rsplit('_', 1)
except ValueError:
LOG.warning('Wrong log file format, it should be formed '
'such as {uuid}_{validation-id}_{timestamp}')
def _get_content(self):
try:
with open(self.abs_log_path, 'r') as log_file:
return json.load(log_file)
except IOError:
msg = "log file: {} not found".format(self.abs_log_path)
raise IOError(msg)
except ValueError:
msg = "bad json format for {}".format(self.abs_log_path)
raise ValueError(msg)
def get_log_path(self):
"""Return full path of a validation log"""
# We return occurence 0, because it should be a uniq file name:
return glob.glob("{}/{}_{}_*.{}".format(self.abs_log_path,
self.uuid, self.validation_id,
self.extension))[0]
def _get_name(self):
"""Return name of the log file under the self.full_path
:rtype: ``string``
"""
return os.path.splitext(os.path.basename(self.abs_log_path))[0]
def _get_time(self):
"""Return time component of the log file name
:rtype: ``string``
"""
return self.name.rsplit('_', 1)[-1]
def is_valid_format(self):
"""Return True if the log file is a valid validation format
The validation log file has to contain three level of data.
- ``plays`` will contain the Ansible execution logs of the playbooks
- ``stat`` will contain the statistics for each targeted hosts
- ``validation_output`` will contain only the warning or failed tasks
.. code:: bash
{
'plays': [],
'stats': {},
'validation_output': []
}
:return: ``True`` if the log file is valid, ``False`` if not.
:rtype: ``boolean``
"""
validation_keys = ['stats', 'validation_output', 'plays']
return bool(set(validation_keys).intersection(self.content.keys()))
@property
def get_logfile_infos(self):
"""Return log file information from the log file basename
:return: A list with the UUID, the validation name and the
datetime of the log file
:rtype: ``list``
:Example:
>>> logfile = '/tmp/123_foo_2020-03-30T13:17:22.447857Z.json'
>>> val = ValidationLog(logfile=logfile)
>>> print(val.get_logfile_infos)
['123', 'foo', '2020-03-30T13:17:22.447857Z']
"""
return self.name.replace('.{}'.format(self.extension), '').split('_')
@property
def get_logfile_datetime(self):
"""Return log file datetime from a UUID and a validation ID
:return: The datetime of the log file
:rtype: ``list``
:Example:
>>> logfile = '/tmp/123_foo_2020-03-30T13:17:22.447857Z.json'
>>> val = ValidationLog(logfile=logfile)
>>> print(val.get_logfile_datetime)
['2020-03-30T13:17:22.447857Z']
"""
return self.name.replace('.{}'.format(self.extension),
'').split('_')[2]
@property
def get_logfile_content(self):
"""Return logfile content
:rtype: ``dict``
"""
return self.content
@property
def get_uuid(self):
"""Return log uuid
:rtype: ``string``
"""
return self.uuid
@property
def get_validation_id(self):
"""Return validation id
:rtype: ``string``
"""
return self.validation_id
@property
def get_status(self):
"""Return validation status
:return: 'FAILED' if there are any failed or unreachable validations,
'SKIPPED' if skipped is True and ok is false which means that
the entire validation has been ignored because no host matched,
'PASSED' if none of those conditions.
:rtype: ``string``
"""
failure_states = ['failures', 'unreachable']
for v_stats in self.content['stats'].values():
if any([v_stats[failure] != 0 for failure in failure_states]):
return 'FAILED'
if v_stats['skipped'] and not v_stats['ok']:
return 'SKIPPED'
return 'PASSED'
@property
def get_host_group(self):
"""Return host group
:return: A comma-separated list of host(s)
:rtype: ``string``
"""
return ', '.join([play['play'].get('host') for
play in self.content['plays']])
@property
def get_hosts_status(self):
"""Return status by host(s)
:return: A comma-separated string of host with its status
:rtype: ``string``
:Example:
>>> logfile = '/tmp/123_foo_2020-03-30T13:17:22.447857Z.json'
>>> val = ValidationLog(logfile=logfile)
>>> print(val.get_hosts_status)
'localhost,PASSED, webserver1,FAILED, webserver2,PASSED'
"""
hosts = []
for h in self.content['stats'].keys():
if self.content['stats'][h].get('failures'):
hosts.append('{},{}'.format(h, 'FAILED'))
elif self.content['stats'][h].get('unreachable'):
hosts.append('{},{}'.format(h, 'UNREACHABLE'))
elif self.content['stats'][h].get('skipped') and not self.content['stats'][h].get('ok'):
hosts.append('{},{}'.format(h, 'SKIPPED'))
else:
hosts.append('{},{}'.format(h, 'PASSED'))
return ', '.join(hosts)
@property
def get_unreachable_hosts(self):
"""Return unreachable hosts
:return: A list of unreachable host(s)
:rtype: ``string``
:Example:
- Multiple unreachable hosts
>>> logfile = '/tmp/123_foo_2020-03-30T13:17:22.447857Z.json'
>>> val = ValidationLog(logfile=logfile)
>>> print(val.get_unreachable_hosts)
'localhost, webserver2'
- Only one unreachable host
>>> logfile = '/tmp/123_foo_2020-03-30T13:17:22.447857Z.json'
>>> val = ValidationLog(logfile=logfile)
>>> print(val.get_unreachable_hosts)
'localhost'
- No unreachable host
>>> logfile = '/tmp/123_foo_2020-03-30T13:17:22.447857Z.json'
>>> val = ValidationLog(logfile=logfile)
>>> print(val.get_unreachable_hosts)
''
"""
return ', '.join(h for h in self.content['stats'].keys()
if self.content['stats'][h].get('unreachable'))
@property
def get_duration(self):
"""Return duration of Ansible runtime
:rtype: ``string``
"""
duration = [play['play']['duration'].get('time_elapsed') for
play in self.content['plays']]
return ', '.join(filter(None, duration))
@property
def get_reason(self):
"""Return validation reason
:return: hostname: reason of the failure
:rtype: ``string``
"""
reason = []
if self.get_status == 'FAILED':
for v_output in self.content['validation_output']:
for h in v_output['task']['hosts']:
msg = v_output['task']['hosts'][h].get('msg',
'Unknown')
if isinstance(msg, list):
msg = ''.join(msg)
try:
msg = msg[:50] + '\n' + msg[50:]
reason.append('{}: {}'.format(h, msg))
except TypeError:
LOG.warning('Wrong failure message type. skipping...')
reason.append('Unknown')
if not self.content['validation_output']:
if self.get_unreachable_hosts:
reason.append('Unreachable')
return ',\n'.join(reason)
@property
def get_start_time(self):
"""Return Ansible start time
:rtype: ``string``
"""
start_time = [play['play']['duration'].get('start') for
play in self.content['plays']]
return ', '.join(filter(None, start_time))
@property
def get_plays(self):
"""Return a list of Playbook data"""
return [play['play'] for play in self.content['plays']]
@property
def get_tasks_data(self):
"""Return a list of task from validation output"""
return [output['task'] for output in self.content['validation_output']]
class ValidationLogs(object):
"""An object for encapsulating the Validation Log files"""
def __init__(self, logs_path=constants.VALIDATIONS_LOG_BASEDIR):
self.logs_path = logs_path
def _get_content(self, file):
try:
with open(file, 'r') as log_file:
return json.load(log_file)
except IOError:
msg = "log file: {} not found".format(file)
raise IOError(msg)
def get_logfile_by_validation(self, validation_id):
"""Return logfiles by validation_id
:param validation_id: The ID of the validation
:type validation_id: ``string``
:return: The list of the log files for a validation
:rtype: ``list``
"""
return glob.glob("{}/*_{}_*".format(self.logs_path, validation_id))
def get_logfile_content_by_validation(self, validation_id):
"""Return logfiles content by validation_id
:param validation_id: The ID of the validation
:type validation_id: ``string``
:return: The list of the log files contents for a validation
:rtype: ``list``
"""
log_files = glob.glob("{}/*_{}_*".format(self.logs_path,
validation_id))
LOG.debug(
"Getting log file for validation {} from {}.".format(
validation_id,
log_files)
)
return [self._get_content(log) for log in log_files]
def get_logfile_by_uuid(self, uuid):
"""Return logfiles by uuid
:param uuid: The UUID of the validation execution
:type uuid: ``string``
:return: The list of the log files by UUID
:rtype: ``list``
"""
return glob.glob("{}/{}_*".format(self.logs_path, uuid))
def get_logfile_content_by_uuid(self, uuid):
"""Return logfiles content by uuid
:param uuid: The UUID of the validation execution
:type uuid: ``string``
:return: The list of the log files contents by UUID
:rtype: ``list``
"""
log_files = glob.glob("{}/{}_*".format(self.logs_path, uuid))
return [self._get_content(log) for log in log_files]
def get_logfile_by_uuid_validation_id(self, uuid, validation_id):
"""Return logfiles by uuid and validation_id
:param uuid: The UUID of the validation execution
:type uuid: ``string``
:param validation_id: The ID of the validation
:type validation_id: ``string``
:return: A list of the log files by UUID and validation_id
:rtype: ``list``
"""
return glob.glob("{}/{}_{}_*".format(self.logs_path, uuid,
validation_id))
def get_logfile_content_by_uuid_validation_id(self, uuid, validation_id):
"""Return logfiles content filter by uuid and validation_id
:param uuid: The UUID of the validation execution
:type uuid: ``string``
:param validation_id: The ID of the validation
:type validation_id: ``string``
:return: A list of the log files content by UUID and validation_id
:rtype: ``list``
"""
log_files = glob.glob("{}/{}_{}_*".format(self.logs_path, uuid,
validation_id))
return [self._get_content(log) for log in log_files]
def get_all_logfiles(self, extension='json'):
"""Return logfiles from logs_path
:param extension: The extension file (Defaults to 'json')
:type extension: ``string``
:return: A list of the absolute path log files
:rtype: ``list``
"""
return [join(self.logs_path, f) for f in os.listdir(self.logs_path) if
os.path.isfile(join(self.logs_path, f)) and extension in
os.path.splitext(join(self.logs_path, f))[1]]
def get_all_logfiles_content(self):
"""Return logfiles content
:return: A list of the contents of every log files available
:rtype: ``list``
"""
return [self._get_content(join(self.logs_path, f))
for f in os.listdir(self.logs_path)
if os.path.isfile(join(self.logs_path, f))]
def get_validations_stats(self, logs):
"""Return validations stats from log files
:param logs: A list of log file contents
:type logs: ``list``
:return: Information about validation statistics.
``last execution date`` and ``number of execution``
:rtype: ``dict``
"""
if not isinstance(logs, list):
logs = [logs]
LOG.debug(
("`get_validations_stats` received `logs` argument "
"of type {} but it expects a list. "
"Attempting to resolve.").format(
type(logs))
)
# Get validation stats
total_number = len(logs)
failed_number = 0
passed_number = 0
last_execution = None
dates = []
LOG.debug(
"Retreiving {} validation stats.".format(total_number)
)
for log in logs:
if log.get('validation_output'):
failed_number += 1
else:
passed_number += 1
date_time = \
log['plays'][0]['play']['duration'].get('start').split('T')
date_start = date_time[0]
time_start = date_time[1].split('Z')[0]
newdate = \
time.strptime(date_start + time_start, '%Y-%m-%d%H:%M:%S.%f')
dates.append(newdate)
if dates:
last_execution = time.strftime('%Y-%m-%d %H:%M:%S', max(dates))
execution_stats = "Total: {}, Passed: {}, Failed: {}".format(
total_number,
passed_number,
failed_number)
LOG.debug(execution_stats)
return {"Last execution date": last_execution,
"Number of execution": execution_stats}
def get_results(self, uuid, validation_id=None):
"""Return a list of validation results by uuid
Can be filter by validation_id
:param uuid: The UUID of the validation execution
:type uuid: ``string` or ``list``
:param validation_id: The ID of the validation
:type validation_id: ``string``
:return: A list of the log files content by UUID and validation_id
:rtype: ``list``
:Example:
>>> v_logs = ValidationLogs()
>>> uuid = '78df1c3f-dfc3-4a1f-929e-f51762e67700'
>>> print(v_logs.get_results(uuid=uuid)
[{'Duration': '0:00:00.514',
'Host_Group': 'undercloud,Controller',
'Status': 'FAILED',
'Status_by_Host': 'undercloud,FAILED, underclou1d,FAILED',
'UUID': '78df1c3f-dfc3-4a1f-929e-f51762e67700',
'Unreachable_Hosts': 'undercloud',
'Validations': 'check-cpu'}]
"""
if isinstance(uuid, list):
results = []
for identifier in uuid:
results.extend(self.get_logfile_by_uuid_validation_id(
identifier,
validation_id)
if validation_id else
self.get_logfile_by_uuid(identifier))
elif isinstance(uuid, str):
results = (self.get_logfile_by_uuid_validation_id(uuid,
validation_id)
if validation_id else self.get_logfile_by_uuid(uuid))
else:
raise RuntimeError(
(
"uuid should be either a str or a list"
"but is {} instead"
).format(type(uuid))
)
res = []
for result in results:
vlog = ValidationLog(logfile=result)
data = {}
data['UUID'] = vlog.get_uuid
data['Validations'] = vlog.get_validation_id
data['Status'] = vlog.get_status
data['Host_Group'] = vlog.get_host_group
data['Status_by_Host'] = vlog.get_hosts_status
data['Unreachable_Hosts'] = vlog.get_unreachable_hosts
data['Duration'] = vlog.get_duration
data['Reasons'] = vlog.get_reason
res.append(data)
return res