bootstrap failure and resume functionality

This commit added feature for test bootstrap failure and resume
functionality

Change-Id: I6a600a4c8acdc37a643e0d38ea02c93fc41e503b
Signed-off-by: Abhishek jaiswal <abhishek.jaiswal@windriver.com>
This commit is contained in:
Abhishek jaiswal
2025-05-01 09:49:51 -04:00
parent d55c1f8459
commit fc1dc13678
3 changed files with 126 additions and 0 deletions

View File

@@ -0,0 +1,42 @@
import yaml
class DeploymentAssetsHandler:
"""
A class for processing YAML files, including reading, writing, and updating values.
"""
def __init__(self, local_file_yaml: str):
"""Initializes the YamlProcessor.
Args:
local_file_yaml (str): yaml file path
"""
self.local_file = local_file_yaml
self.data = self._read_yaml()
def _read_yaml(self) -> None:
"""Reads the YAML file from the local file."""
try:
with open(self.local_file, "r") as local_file_st:
data = yaml.safe_load(local_file_st)
except FileNotFoundError:
raise FileNotFoundError(f"File not found locally: {self.local_file}")
except yaml.YAMLError as e:
raise yaml.YAMLError(f"Error parsing YAML from local file: {e}")
return data
def write_yaml(self):
"""Write the YAML to the local file."""
try:
# Write to local file
with open(self.local_file, "w") as local_file_st:
yaml.dump(self.data, local_file_st, indent=2)
except Exception as e:
raise Exception(f"Error writing YAML file: {e}")
def inject_wrong_bootstrap_value(self):
"""Function to change docker registry value"""
# inject wrong values in Bootstrap values
self.data["docker_registries"]["k8s.gcr.io"]["url"] = "registry.central:9001/k8s.gcr.io.wrong"
self.write_yaml()

View File

@@ -307,3 +307,14 @@ class FileKeywords(BaseKeyword):
except Exception as e:
get_logger().log_error(f"Failed to rsync file from {remote_user}@{remote_server}:{remote_path} to {local_dest_path}: {e}")
raise KeywordException(f"Failed to rsync file from {remote_user}@{remote_server}:{remote_path} to {local_dest_path}: {e}") from e
return True
def copy_file(self, src_file: str, dest_file: str):
"""Copies a file from the source path to the destination path.
Args:
src_file (str): The source file path.
dest_file (str): The destination file path.
"""
self.ssh_connection.send(f"cp {src_file} {dest_file}")

View File

@@ -1,14 +1,19 @@
import os
import time
from pytest import mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_delete_keywords import DcManagerSubcloudDeleteKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_deploy_keywords import DCManagerSubcloudDeployKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_manager_keywords import DcManagerSubcloudManagerKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.yaml.deployment_assets_yaml import DeploymentAssetsHandler
from keywords.files.file_keywords import FileKeywords
def get_undeployed_subcloud_name(ssh_connection: SSHConnection) -> str:
@@ -123,3 +128,71 @@ def test_subcloud_deploy_abort_resume(request):
dcm_sc_list_kw.validate_subcloud_status(subcloud_name, "bootstrap-aborted")
# dcmanager subcloud deploy resume
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_resume(subcloud_name)
def test_bootstrap_failure_replay():
"""Test bootstrap failure and resume
Test Steps:
- log onto system controller
- get the ssh connection to the active controller
- check first to see if there is an undeployed subcloud if yes run test
- If we don't have a subcloud that is undeployed, remove it and then run the test.
- backup the original bootstrap values yaml file
- download the bootstrap values yaml file
- inject the wrong docker registery
- execute the phased deployment create
- execute the phased deployment install
- execute the phased deployment bootstrap
- wait for bootstrap to fail
- restore original bootstrap values yaml file
- execute the phased deployment resume
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
subcloud_name = get_undeployed_subcloud_name(ssh_connection)
# Get the subcloud deployment assets
deployment_assets_config = ConfigurationManager.get_deployment_assets_config()
sc_assets = deployment_assets_config.get_subcloud_deployment_assets(subcloud_name)
bootstrap_file = sc_assets.get_bootstrap_file()
directory, local_bootstrap_file = os.path.split(bootstrap_file)
bootstrap_bkup_file = f"{bootstrap_file}.bkup"
# backup the file before editing
FileKeywords(ssh_connection).copy_file(bootstrap_file, bootstrap_bkup_file)
# download file before for updation
FileKeywords(ssh_connection).download_file(bootstrap_file, local_bootstrap_file)
# download bootstrap values file
bootstrap_yaml_obj = DeploymentAssetsHandler(local_bootstrap_file)
bootstrap_yaml_obj.inject_wrong_bootstrap_value()
# upload file
FileKeywords(ssh_connection).upload_file(local_bootstrap_file, bootstrap_file)
# deploy the subcloud with wrong values
dcm_sc_deploy_kw = DCManagerSubcloudDeployKeywords(ssh_connection)
# dcmanager subcloud deploy create
get_logger().log_info(f"dcmanager subcloud deploy create {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_create(subcloud_name)
# dcmanager subcloud deploy install
get_logger().log_info(f"dcmanager subcloud deploy install {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_install(subcloud_name)
# dcmanager subcloud deploy bootstrap
get_logger().log_info(f"dcmanager subcloud deploy bootstrap {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_bootstrap(subcloud_name, wait_for_status=False)
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, "bootstrap-failed")
# restore original file
FileKeywords(ssh_connection).rename_file(bootstrap_bkup_file, bootstrap_file)
# dcmanager subcloud deploy resume
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_resume(subcloud_name)
# dcmanager subcloud deploy config
get_logger().log_info(f"dcmanager subcloud deploy config {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_config(subcloud_name)
sc_status = dc_manager_sc_list_kw.get_dcmanager_subcloud_list().get_subcloud_by_name(subcloud_name).get_deploy_status()
validate_equals(sc_status, "complete", "Validate that subcloud is now deployed")