Files
update/software/scripts/deploy-precheck
sshathee f3ab009fd3 deploy precheck circular dependency
This commit helps remove circular dependency of
usm->sysinv->usm. Hosts patch current check is
done in deploy precheck script so that callback
to usm can be avoided.

Depends-On: https://review.opendev.org/c/starlingx/config/+/927046

TestPlan:
PASS: check deploy precheck is not getting timed out
PASS: verify extra param in url query does not affect
on old config code
PASS: verify deploy precheck works for major release upgrade
PASS: verify deploy precheck works for patch release upgrade

Story: 2010676
Task: 50897
Change-Id: Ic32335798e850d3e7060ad817f8427da289d895b
Signed-off-by: sshathee <shunmugam.shatheesh@windriver.com>
2024-08-27 11:34:34 -04:00

424 lines
16 KiB
Python

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2023-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Run platform upgrade deploy precheck as a standalone executable
"""
import argparse
import json
import os
import re
import requests
import subprocess
import sys
import tempfile
from lxml import etree as ElementTree
from tsconfig.tsconfig import SW_VERSION
import upgrade_utils
# TODO(heitormatsui) keep updated for every release
SUPPORTED_K8S_VERSIONS = [
"v1.24.4",
"v1.25.3",
"v1.26.1",
"v1.27.5",
"v1.28.4",
"v1.29.2",
]
RC_SUCCESS = 0
RC_UNHEALTHY = 3
STATE_AVAILABLE = 'available'
STATE_DEPLOYED = 'deployed'
class HealthCheck(object):
"""This class represents a general health check object
that uses sysinv-client to run system health checks"""
SUCCESS_MSG = 'OK'
FAIL_MSG = 'Fail'
def __init__(self, config):
self._config = config
# get target release from script directory location
self._target_release = re.match("^.*/rel-(\d\d.\d\d.\d+)/", __file__).group(1)
self._major_release = self._target_release.rsplit(".", 1)[0]
# get sysinv token, endpoint and client
self._sysinv_token, self._sysinv_endpoint = \
upgrade_utils.get_token_endpoint(config, service_type="platform")
self._sysinv_client = upgrade_utils.get_sysinv_client(self._sysinv_token,
self._sysinv_endpoint)
def _check_license(self, version):
"""
Validates the current license is valid for the specified version
:param version: version to be checked against installed license
:return: True is license is valid for version, False otherwise
"""
license_dict = self._sysinv_client.license.show()
if license_dict["error"]:
return False
# create temp file with license content to run verify-license binary against it
with tempfile.NamedTemporaryFile(mode="w", delete=True) as license_file:
try:
license_file.write(license_dict["content"])
subprocess.check_call(["/usr/bin/verify-license", # pylint: disable=not-callable
license_file.name,
version])
except subprocess.CalledProcessError:
return False
return True
# TODO(heitormatsui): implement patch precheck targeted against USM
# and implement patch precheck for subcloud
def _check_required_patches_state(self, required_patches, patch_health_check=False):
"""
Check if the required patches are in 'deployed' state, if patch_health_check is
True, the required_patches can be in 'available' state as well.
:param required_patches: list of patches to be checked
:param patch_health_check: boolean if is a patch or upgrage health check
:return: boolean indicating success/failure and list of patches
that are not in the 'deployed' or 'available' state
"""
success = True
releases = self._config.get("releases", "")
releases_in_allowed_states = []
for release in json.loads(releases):
if release['state'] == STATE_DEPLOYED or \
(patch_health_check and release['state'] == STATE_AVAILABLE):
releases_in_allowed_states.append(release)
allowed_patches = [release['release_id'] for release in releases_in_allowed_states]
missing_patches = list(set(required_patches) - set(allowed_patches))
if missing_patches:
success = False
return success, missing_patches
def run_general_health_check(self):
"""Run general health check using sysinv client"""
force = self._config.get("force", False)
health_ok = success = True
alarm_ignore_list = ["900.201"]
api_cmd = self._sysinv_endpoint + "/health/kube-upgrade"
if force:
api_cmd += '/relaxed'
if alarm_ignore_list:
api_cmd += f'?alarm_ignore_list={alarm_ignore_list}'
method = 'GET'
output = upgrade_utils.call_api(self._sysinv_token, method, api_cmd)
# check hosts are patch current
deploy_in_progress = self._config.get("deploy_in_progress", "{}")
deploy_in_progress = json.loads(deploy_in_progress)
if deploy_in_progress:
success = False
from_load = deploy_in_progress["from_release"]
to_load = deploy_in_progress["to_release"]
output += ('All hosts are patch current: [%s]\n') \
% (HealthCheck.FAIL_MSG)
output += ('Deployment in progress: %s to %s\n' % (from_load, to_load))
else:
success = True
output += ('All hosts are patch current: [%s]\n') \
% (HealthCheck.SUCCESS_MSG)
health_ok = health_ok and success
if HealthCheck.FAIL_MSG in output:
success = False
health_ok = health_ok and success
# check installed license
success = self._check_license(self._major_release)
output += 'Installed license is valid: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
return health_ok, output
class UpgradeHealthCheck(HealthCheck):
"""This class represents a upgrade-specific health check object
that verifies if system is in a valid state for upgrade"""
# TODO(heitormatsui): switch from using upgrade metadata xml to
# the new USM metadata format
def _check_valid_upgrade_path(self):
"""Checks if active release to specified release is a valid upgrade path"""
# Get active release
isystem = self._sysinv_client.isystem.list()[0]
active_release = isystem.software_version
# supported_release is a dict with {release: required_patch}
supported_releases = dict()
# Parse upgrade metadata file for supported upgrade paths
root = ElementTree.parse("/var/www/pages/feed/rel-%s/upgrades/metadata.xml" % self._major_release)
upgrade_root = root.find("supported_upgrades").findall("upgrade")
for upgrade in upgrade_root:
version = upgrade.find("version")
required_patch = upgrade.find("required_patch")
supported_releases.update({version.text: [required_patch.text] if
required_patch is not None else []})
success = active_release in supported_releases
return success, active_release, supported_releases.get(active_release, [])
# TODO(heitormatsui) do we need this check on USM? Remove if we don't
def _check_active_is_controller_0(self):
"""Checks that active controller is controller-0"""
controllers = self._sysinv_client.ihost.list()
for controller in controllers:
if controller.hostname == "controller-0" and \
"Controller-Active" in controller.capabilities["Personality"]:
return True
return False
def _check_kube_version(self, supported_versions):
"""
Check if active k8s version is in a list of supported versions
:param supported_versions: list of supported k8s versions
:return: boolean indicating success/failure and active k8s version
"""
kube_versions = self._sysinv_client.kube_version.list()
active_version = None
for kv in kube_versions:
if kv.state == "active":
active_version = kv.version
break
success = active_version in supported_versions
return success, active_version
def run_health_check(self):
"""Run specific upgrade health checks"""
# run health check for 22.12
# TODO(ShawnLi): remove this once upgrade from 22.12 is deprecated
if SW_VERSION == '22.12':
return self.run_health_check_in_from_release()
health_ok = True
output = ""
# check if it is a valid upgrade path
success, active_release, required_patches = self._check_valid_upgrade_path()
output += 'Valid upgrade path from release %s to %s: [%s]\n' \
% (active_release, self._major_release,
HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
# check if required patches are deployed
success, missing_patches = self._check_required_patches_state(required_patches)
output += 'Required patches are applied: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not success:
output += '-> Patches not applied: [%s]\n' \
% ', '.join(missing_patches)
health_ok = health_ok and success
# check if k8s version is valid
success, active_version = self._check_kube_version(SUPPORTED_K8S_VERSIONS)
output += 'Active kubernetes version [%s] is a valid supported version: [%s]\n' \
% (active_version, HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not active_version:
output += ('-> Failed to get version info. Upgrade kubernetes to one of the '
'supported versions [%s] and ensure that the kubernetes version '
'information is available in the kubeadm configmap.\n'
'See "system kube-version-list"\n' % ", ".join(SUPPORTED_K8S_VERSIONS))
elif not success:
output += ('-> Upgrade active kubernetes version [%s] to one of the '
'supported versions [%s]. See "system kube-version-list"\n' %
(active_version, ", ".join(SUPPORTED_K8S_VERSIONS)))
health_ok = health_ok and success
# TODO(heitormatsui) Do we need the following check on USM?
# The load is only imported to controller-0. An upgrade can only
# be started when controller-0 is active.
is_controller_0 = self._check_active_is_controller_0()
success = is_controller_0
output += \
'Active controller is controller-0: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
return health_ok, output
def run_health_check_in_from_release(self):
"""
Run the health check in 22.12 release environment
:return: tuple (success, output)
"""
health_ok = True
output = ""
success, active_release, required_patches = self._check_valid_upgrade_path()
output += 'Valid upgrade path from release %s to %s: [%s]\n' \
% (active_release, self._major_release,
HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
# check if required patches are deployed
success, missing_patches = self._check_required_patches(required_patches)
output += 'Required patches are applied: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not success:
output += '-> Patches not applied: [%s]\n' \
% ', '.join(missing_patches)
health_ok = health_ok and success
return health_ok, output
def _check_required_patches(self, required_patches):
"""
Check if required patches are applied using the patching API
:return: tuple (success, missing_patches)
"""
try:
patch_token, patch_endpoint = upgrade_utils.get_token_endpoint(
self._config, service_type="patching")
patch_endpoint += "/v1/query/"
response = requests.get(patch_endpoint, headers={
"X-Auth-Token": patch_token}, timeout=10)
except Exception as e:
return False, "Failed to connect to patching API: %s" % e
query_patches = response.json()['pd']
applied_patches = []
for patch_key, patch in query_patches.items():
if patch.get('patchstate') in {'Applied', 'Committed'}:
applied_patches.append(patch_key)
missing_patches = [patch for patch in required_patches if patch not in applied_patches]
success = not missing_patches
return success, missing_patches
class PatchHealthCheck(HealthCheck):
"""This class represents a patch-specific health check object
that verifies if system is in valid state to apply a patch"""
def _get_required_patches(self):
"""Get required patches for a target release"""
releases = self._config.get("releases")
required_patches = []
for release in json.loads(releases):
if release["sw_version"] == self._target_release:
required_patches.extend(release["requires"])
break
return required_patches
def run_health_check(self):
"""Run specific patch health checks"""
health_ok = True
output = ""
# check required patches for target release
required_patches = self._get_required_patches()
success, missing_patches = self._check_required_patches_state(required_patches, True)
output += 'Required patches are deployed or available: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not success:
output += '-> Patches not deployed or available: [%s]\n' \
% ', '.join(missing_patches)
health_ok = health_ok and success
return health_ok, output
def parse_config(args=None):
"""Parse the parameters passed to the script"""
parser = argparse.ArgumentParser(description="Run health checks to verify if the system "
"meets the requirements to deploy a specific "
"release.")
parser.add_argument("--auth_url",
help="Authentication URL",
required=True)
parser.add_argument("--username",
help="Username",
required=True)
parser.add_argument("--password",
help="Password",
required=True)
parser.add_argument("--project_name",
help="Project Name",
required=True)
parser.add_argument("--user_domain_name",
help="User Domain Name",
required=True)
parser.add_argument("--project_domain_name",
help="Project Domain Name",
required=True)
parser.add_argument("--region_name",
help="Region Name",
default="RegionOne")
parser.add_argument("--force",
help="Ignore non-critical health checks",
action="store_true")
parser.add_argument("--patch",
help="Set precheck to run against a patch release",
action="store_true")
parser.add_argument("--releases",
help="Releases",
default="[]")
parser.add_argument("--deploy_in_progress",
help="check hosts are patch current",
default="{}")
# if args was not passed will use sys.argv by default
parsed_args = parser.parse_args(args)
return vars(parsed_args)
def main(argv=None):
config = parse_config(argv)
patch_release = config.get("patch", False)
health_ok = True
output = ""
if patch_release:
health_check = PatchHealthCheck(config)
else:
health_check = UpgradeHealthCheck(config)
# execute general health check
general_health_ok, general_output = health_check.run_general_health_check()
# execute release-specific health check
specific_health_ok, specific_output = health_check.run_health_check()
# combine health check results removing extra line breaks/blank spaces from the output
health_ok = general_health_ok and specific_health_ok
output = general_output.strip() + "\n" + specific_output.strip()
# print health check output and exit
print(output)
if health_ok:
return RC_SUCCESS
else:
return RC_UNHEALTHY
if __name__ == "__main__":
sys.exit(main())