Fix OSD removal
When the rook-ceph backend is configured, the system host-stor-delete command does not completely remove the OSD from the cluster. While the OSD is deleted from the database, it remains present in the cluster. This means that the OSD is not fully decommissioned, and it still exists as part of the cluster, which could lead to inconsistencies or resource issues. Together with that, to a OSD be fully decommissioned a wipe on disk is needed. But the rpcapi could not execute the wipe if the host is not online or any other network issue between the hosts. Also, if the host has been reinstalled after the disk was added to the storage backend, the disk will be wiped on kickstart what results on the rook-ceph-operator trying to create a new OSD in the same mount. Furthermore, the way in which the disks to be removed were obtained was changed. From now on, when running the "host-stor-delete" command, the stor state is set to the "deleting-with-app" state, and then in the lifecycle, all OSDs that have this state are obtained and removed. We are using a architeture based on kubernetes jobs (trigged on lifecycle post-apply) that executes a script to fully remove the OSDs from the cluster and also trigger the job to wipe disks related to OSDs. The lifecycle uses the new yaml templates folder to create k8s resources needed to perform this operation. All the places that was using rpcapi to prepare the disks were replaced to use the ceph wipe disks job (the same used by remove OSDs job): - pre-apply: to wipe all disks that will be configured on the app apply - post-apply: triggered by remove-osds-job to fully delete the OSD - pre-remove: when the ceph cluster are being cleaned up The sync-osds-job must be changed to only check if the deployment OSD count is less or equal to database OSD count to avoid the jobs stuck during a removal OSDs operation. Test Plan: - PASS: Remove a OSD from the cluster using host-stor-delete - PASS: Change the min replication factor and try to remove the OSD - PASS: Change the deployment_model and redo the tests - PASS: Add data to a OSD and check if the data was redistributed - PASS: Check if the OSD was removed from database - PASS: Check if expand an existing cluster was impacted after reduction change - PASS: Install Rook-Ceph and abort the apply in the middle, then try to apply again - PASS: Add data to the already configured OSDs and try to reapply, verify that no data was lost - PASS: Add an OSD with host-stor-add and then immediately reinstall the host that houses the OSD. - PASS: Add an OSD with host-stor-add, apply the rook ceph and then immediately reinstall the host that houses the OSD Depends-On: https://review.opendev.org/c/starlingx/config/+/937730 Closes-Bug: 2093897 Change-Id: I969f891235b2b7fa6ba0a927a4a8e3419299ecb2 Signed-off-by: Erickson Silva de Oliveira <Erickson.SilvadeOliveira@windriver.com> Signed-off-by: Gabriel Przybysz Gonçalves Júnior <gabriel.przybyszgoncalvesjunior@windriver.com> Signed-off-by: Gustavo Ornaghi Antunes <gustavo.ornaghiantunes@windriver.com>
This commit is contained in:

committed by
Gustavo Ornaghi Antunes

parent
ed4de419de
commit
a8d37051f8
@@ -199,12 +199,15 @@ data:
|
||||
[ "${COUNT_OSDS_TO_DEPLOY}" -eq 0 ] && [[ ! "${HOSTS_READY[*]}" =~ ${HOST_NAME} ]] && HOSTS_READY+=("${HOST_NAME}") && continue
|
||||
|
||||
DEPLOYMENT_OSDS_CMD="kubectl get deployment -n rook-ceph -l app=rook-ceph-osd,topology-location-host=${HOST_NAME} -o json"
|
||||
|
||||
COUNT_OSDS_DEPLOYMENT_CMD="${DEPLOYMENT_OSDS_CMD} | jq -r '.items | length'"
|
||||
retries=10
|
||||
wait_check "${COUNT_OSDS_DEPLOYMENT_CMD}" "== '${COUNT_OSDS_TO_DEPLOY}'" "Waiting for osd to be deployed" ${retries}
|
||||
return_code=$?
|
||||
[ ${return_code} -eq 1 ] && echo "Error when checking osds count" && return 1
|
||||
|
||||
COUNT_OSDS_DEPLOYMENT=$(eval "${COUNT_OSDS_DEPLOYMENT_CMD}")
|
||||
if [ "${COUNT_OSDS_DEPLOYMENT}" -le "${COUNT_OSDS_TO_DEPLOY}" ]; then
|
||||
retries=10
|
||||
wait_check "${COUNT_OSDS_DEPLOYMENT_CMD}" "== '${COUNT_OSDS_TO_DEPLOY}'" "Waiting for osd to be deployed" ${retries}
|
||||
return_code=$?
|
||||
[ ${return_code} -eq 1 ] && echo "Error when checking osds count" && return 1
|
||||
fi
|
||||
|
||||
DEPLOYMENT_OSDS=($(${DEPLOYMENT_OSDS_CMD}))
|
||||
OSDS_DEPLOYMENT_PATH=($(echo "${DEPLOYMENT_OSDS[@]}" | jq -r ".items[].spec.template.spec.containers[0].env[] | select(.name==\"ROOK_BLOCK_PATH\") | .value"))
|
||||
|
@@ -128,6 +128,8 @@ ALARM_TYPE_REPLICATION_MISSING_OSDS = "osds"
|
||||
ALARM_TYPE_REPLICATION_MISSING_HOSTS = "osd-hosts"
|
||||
ALARM_TYPE_MISSING_FLOATING_MON = "floating-mon"
|
||||
ALARM_TYPE_REMOVE_FLOATING_MON = "remove-floating-mon"
|
||||
ALARM_TYPE_LONG_RUNNING_OPERATIONS_IN_PROGRESS = "long-running-operations"
|
||||
ALARM_TYPE_REMOVE_OSDS_ABORTED = "remove-osds-aborted"
|
||||
|
||||
# Supported network deployments
|
||||
# - constants.NETWORK_TYPE_CLUSTER_HOST (not validated)
|
||||
|
@@ -1,6 +1,6 @@
|
||||
#
|
||||
# Copyright (c) 2021 Intel Corporation, Inc.
|
||||
# Copyright (c) 2024 Wind River Systems, Inc.
|
||||
# Copyright (c) 2024-2025 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
@@ -13,12 +13,16 @@ import re
|
||||
import json
|
||||
|
||||
from os import path
|
||||
from os import remove as os_remove
|
||||
from time import sleep
|
||||
from subprocess import run
|
||||
from string import Template
|
||||
from eventlet import Timeout
|
||||
|
||||
from kubernetes import config
|
||||
from kubernetes import client
|
||||
from kubernetes import utils
|
||||
from kubernetes.watch import Watch as kube_watch
|
||||
|
||||
from fm_api import constants as fm_constants
|
||||
from fm_api import fm_api
|
||||
@@ -34,7 +38,6 @@ from sysinv.common import utils as cutils
|
||||
from sysinv.helm import lifecycle_base as base
|
||||
from sysinv.helm.lifecycle_constants import LifecycleConstants
|
||||
from sysinv.helm import lifecycle_utils as lifecycle_utils
|
||||
from sysinv.agent import rpcapiproxy as agent_rpcapi
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -53,9 +56,11 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
|
||||
# Fluxcd request
|
||||
if hook_info.lifecycle_type == LifecycleConstants.APP_LIFECYCLE_TYPE_FLUXCD_REQUEST:
|
||||
if (hook_info.operation == constants.APP_REMOVE_OP and
|
||||
hook_info.relative_timing == LifecycleConstants.APP_LIFECYCLE_TIMING_PRE):
|
||||
return self.pre_remove(app_op, context)
|
||||
if hook_info.relative_timing == LifecycleConstants.APP_LIFECYCLE_TIMING_PRE:
|
||||
if hook_info.operation == constants.APP_REMOVE_OP:
|
||||
return self.pre_remove(app_op)
|
||||
if hook_info.operation == constants.APP_APPLY_OP:
|
||||
return self.prepare_disks_to_apply(app_op)
|
||||
|
||||
# Resources
|
||||
elif hook_info.lifecycle_type == LifecycleConstants.APP_LIFECYCLE_TYPE_RESOURCE:
|
||||
@@ -72,7 +77,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
self.update_backend(app, app_op)
|
||||
if hook_info.relative_timing == LifecycleConstants.APP_LIFECYCLE_TIMING_POST:
|
||||
if hook_info.operation == constants.APP_APPLY_OP:
|
||||
return self.post_apply(app, app_op, hook_info, context)
|
||||
return self.post_apply(app, app_op, hook_info)
|
||||
elif hook_info.operation == constants.APP_REMOVE_OP:
|
||||
return self.post_remove(app, app_op)
|
||||
|
||||
@@ -145,7 +150,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
LOG.info("Trigger alarms if necessary")
|
||||
self.pre_apply_raise_alarms(app_op)
|
||||
|
||||
def pre_remove(self, app_op, context):
|
||||
def pre_remove(self, app_op):
|
||||
""" Pre remove actions
|
||||
|
||||
This function run all pre-remove functions to cleanup
|
||||
@@ -154,12 +159,12 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
|
||||
"""
|
||||
LOG.info("Cleaning up the ceph cluster")
|
||||
self.cluster_cleanup(app_op, context)
|
||||
self.cluster_cleanup(app_op)
|
||||
|
||||
LOG.info("Removing ceph alarms")
|
||||
self.remove_alarms(app_op)
|
||||
|
||||
def post_apply(self, app, app_op, hook_info, context):
|
||||
def post_apply(self, app, app_op, hook_info):
|
||||
""" Post apply actions
|
||||
|
||||
:param app: AppOperator.Application object
|
||||
@@ -171,7 +176,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
self.update_host_fs(app, app_op)
|
||||
self.update_controller_fs(app, app_op)
|
||||
|
||||
self.update_osds(app_op, context)
|
||||
self.remove_osds(app_op)
|
||||
self.delete_mon_mgr(app, app_op)
|
||||
|
||||
if LifecycleConstants.EXTRA not in hook_info:
|
||||
@@ -195,7 +200,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
self.delete_mon_mgr(app, app_op)
|
||||
self.delete_crds()
|
||||
|
||||
def cluster_cleanup(self, app_op, context):
|
||||
def cluster_cleanup(self, app_op):
|
||||
""" Run cluster cleanup
|
||||
|
||||
This function set the cleanup confirmation in the ceph cluster
|
||||
@@ -485,7 +490,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
# -------
|
||||
if not is_jobs_completed and not path.isfile(constants.RESTORE_IN_PROGRESS_FLAG):
|
||||
LOG.info("Cleanup Jobs did not completed. Force removing finalizers and wiping OSDs")
|
||||
self.wipe_all_osds(app_op._dbapi, context)
|
||||
self.wipe_all_osds(app_op._dbapi)
|
||||
self.remove_resource_finalizers()
|
||||
|
||||
self.launch_file_cleanup_jobs(app_op._dbapi, kube_batch)
|
||||
@@ -741,6 +746,10 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
app_constants.ALARM_TYPE_REMOVE_FLOATING_MON:
|
||||
"Floating monitor is exclusive for AIO-DX without workers. "
|
||||
"Move it to the worker node.",
|
||||
app_constants.ALARM_TYPE_LONG_RUNNING_OPERATIONS_IN_PROGRESS:
|
||||
"The rook-ceph application is executing long running operations.",
|
||||
app_constants.ALARM_TYPE_REMOVE_OSDS_ABORTED:
|
||||
"Remove OSDs operation was aborted, ceph is not responding.",
|
||||
}
|
||||
|
||||
repairs = {
|
||||
@@ -757,6 +766,10 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
app_constants.ALARM_TYPE_REMOVE_FLOATING_MON:
|
||||
"Remove the floating monitor and use the host-fs-add to add a ceph host "
|
||||
"filesystem (local monitor) to a worker.",
|
||||
app_constants.ALARM_TYPE_LONG_RUNNING_OPERATIONS_IN_PROGRESS:
|
||||
"Wait until the rook-ceph application finishes all long running operations.",
|
||||
app_constants.ALARM_TYPE_REMOVE_OSDS_ABORTED:
|
||||
"Repair your ceph cluster and reapply the rook-ceph application.",
|
||||
}
|
||||
|
||||
if alarm_type == app_constants.ALARM_TYPE_MISSING_MONS:
|
||||
@@ -789,6 +802,18 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
app_constants.HELM_APP_ROOK_CEPH,
|
||||
app_constants.ALARM_TYPE_REMOVE_FLOATING_MON)
|
||||
|
||||
elif alarm_type == app_constants.ALARM_TYPE_LONG_RUNNING_OPERATIONS_IN_PROGRESS:
|
||||
entity_instance_id = "{}={}-{}".format(
|
||||
fm_constants.FM_ENTITY_TYPE_APPLICATION,
|
||||
app_constants.HELM_APP_ROOK_CEPH,
|
||||
app_constants.ALARM_TYPE_LONG_RUNNING_OPERATIONS_IN_PROGRESS)
|
||||
|
||||
elif alarm_type == app_constants.ALARM_TYPE_REMOVE_OSDS_ABORTED:
|
||||
entity_instance_id = "{}={}-{}".format(
|
||||
fm_constants.FM_ENTITY_TYPE_APPLICATION,
|
||||
app_constants.HELM_APP_ROOK_CEPH,
|
||||
app_constants.ALARM_TYPE_REMOVE_OSDS_ABORTED)
|
||||
|
||||
if action == fm_constants.FM_ALARM_STATE_SET:
|
||||
fault = fm_api.Fault(
|
||||
alarm_id=fm_constants.FM_ALARM_ID_STORAGE_CEPH,
|
||||
@@ -828,6 +853,11 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
"system mode that there are no worker hosts." %
|
||||
(constants.CEPH_ROOK_DEPLOYMENT_DEDICATED))
|
||||
|
||||
# CHECK AND FAIL: Need all long running operations completed
|
||||
if "has_long_running_operations" in capabilities:
|
||||
raise exception.LifecycleSemanticCheckException(
|
||||
"Cannot apply application: Long running operations are in progress.")
|
||||
|
||||
# CHECK AND FAIL: Need a minimum 1 OSD
|
||||
host_with_osds_count, _ = self.get_osd_count(dbapi, ceph_rook_backend)
|
||||
if host_with_osds_count < 1:
|
||||
@@ -886,184 +916,6 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
"If the host is unrecoverable, please delete it from the plaform with 'system host-delete'"
|
||||
)
|
||||
|
||||
def get_osds_to_remove(self, dbapi, new_nodes, old_nodes, osd_deployments):
|
||||
osds_info = []
|
||||
hosts_with_osds_to_remove = []
|
||||
|
||||
# Get informations from OSDs deployments
|
||||
for deployment in osd_deployments:
|
||||
env_vars = deployment.spec.template.spec.containers[0].env
|
||||
osd_path_node = next((env.value for env in env_vars if env.name == 'ROOK_BLOCK_PATH'), None)
|
||||
|
||||
try:
|
||||
host = dbapi.ihost_get_by_hostname(deployment.metadata.labels['topology-location-host'])
|
||||
except exception.NodeNotFound:
|
||||
LOG.warn(f"Node {deployment.metadata.labels['topology-location-host']} not found.")
|
||||
break
|
||||
|
||||
disks = dbapi.idisk_get_by_ihost(host.uuid)
|
||||
disk_uuid, disk_node, disk_path = next(((disk.uuid, disk.device_node, disk.device_path)
|
||||
for disk in disks if disk.device_node == osd_path_node), None)
|
||||
|
||||
if osd_path_node and 'ceph-osd-id' in deployment.metadata.labels:
|
||||
osds_info.append({
|
||||
'id': deployment.metadata.labels['ceph-osd-id'],
|
||||
'uuid': disk_uuid,
|
||||
'device_node': disk_node,
|
||||
'device_path': disk_path,
|
||||
'hostname': host.hostname,
|
||||
'hostid': host.id,
|
||||
})
|
||||
|
||||
for new_node in new_nodes:
|
||||
old_node = next((n for n in old_nodes
|
||||
if n['name'] == new_node['name']), None)
|
||||
|
||||
if old_node:
|
||||
new_devices = {device['name'] for device in new_node['devices']}
|
||||
old_devices = {device['name'] for device in old_node['devices']}
|
||||
|
||||
diff = old_devices - new_devices
|
||||
|
||||
try:
|
||||
host = dbapi.ihost_get_by_hostname(new_node['name'])
|
||||
except exception.NodeNotFound:
|
||||
LOG.warn(f"Node {new_node['name']} not found.")
|
||||
break
|
||||
|
||||
if diff:
|
||||
hosts_with_osds_to_remove.append({
|
||||
'uuid': host.uuid,
|
||||
'name': host.hostname,
|
||||
'devices': [{
|
||||
'device_path': device_path,
|
||||
'id': next((osd.get('id') for osd in osds_info
|
||||
if (device_path == osd.get('device_path') and
|
||||
host.id == osd.get('hostid'))), None),
|
||||
'uuid': next((osd.get('uuid') for osd in osds_info
|
||||
if (device_path == osd.get('device_path') and
|
||||
host.id == osd.get('hostid'))), None),
|
||||
'device_node': next((osd.get('device_node') for osd in osds_info
|
||||
if (device_path == osd.get('device_path') and
|
||||
host.id == osd.get('hostid'))), None)
|
||||
} for device_path in diff]
|
||||
})
|
||||
|
||||
LOG.info(f"OSDs to be removed: {hosts_with_osds_to_remove}")
|
||||
return hosts_with_osds_to_remove
|
||||
|
||||
def wait_redistribution(self, osd, wait_length=0, retries=50, delay=10):
|
||||
while retries > 0:
|
||||
cmd = 'ceph pg dump'.split(" ")
|
||||
stdout, stderr = cutils.trycmd(*cmd)
|
||||
if stderr and "dumped all" not in stderr:
|
||||
LOG.warning(f"Error when get pg dump. stderr: {stderr}")
|
||||
|
||||
# Patterns to check
|
||||
pattern_osd = re.compile(rf"\[.*{osd.get('id')}.*\]")
|
||||
pattern_status = re.compile(r"(degraded|undersized|recovering)")
|
||||
|
||||
# Checking lines
|
||||
osd_lines = [line for line in stdout.splitlines() if pattern_osd.search(line)]
|
||||
line_count_osd = len(osd_lines)
|
||||
line_count_status = len([line for line in osd_lines if pattern_status.search(line)])
|
||||
|
||||
LOG.info(f"Waiting for ceph redistribution. osd.{osd.get('id')} still has {line_count_osd} PGs")
|
||||
# Check PG count in OSD and if all PGs is health
|
||||
if line_count_osd <= wait_length and line_count_status <= 0:
|
||||
LOG.info(f"Ceph osd.{osd.get('id')} has been redistributed.")
|
||||
return True
|
||||
|
||||
retries -= 1
|
||||
sleep(delay)
|
||||
|
||||
LOG.info(f"Ceph osd.{osd.get('id')} redistribution failed.")
|
||||
return False
|
||||
|
||||
def remove_osds(self, dbapi, osds_to_remove, kube_apps, context):
|
||||
for host in osds_to_remove:
|
||||
LOG.info(f"Removing OSDs from host {host['name']}")
|
||||
for osd in host["devices"]:
|
||||
LOG.info(f"Removing osd.{osd.get('id')}")
|
||||
|
||||
# Change OSD to be out
|
||||
cmd = f"ceph osd out osd.{osd.get('id')}".split(" ")
|
||||
stdout, stderr = cutils.trycmd(*cmd)
|
||||
LOG.info(f"Changing osd.{osd.get('id')} to be OUT. stout: {stdout}, stdeer: {stderr}")
|
||||
|
||||
# Wait for ceph osd redistribution
|
||||
redistributed = self.wait_redistribution(osd, wait_length=10)
|
||||
|
||||
if not redistributed:
|
||||
# Change OSD to be in after redistribution failed
|
||||
cmd = f"ceph osd in osd.{osd.get('id')}".split(" ")
|
||||
stdout, stderr = cutils.trycmd(*cmd)
|
||||
LOG.info(f"Ceph osd.{osd.get('id')} redistribution failed!")
|
||||
LOG.info(f"Changing osd.{osd.get('id')} to be IN. stout: {stdout}, stdeer: {stderr}")
|
||||
break
|
||||
|
||||
# Scaling OSD deployment to 0
|
||||
self.scale_deployment(f"osd={osd.get('id')}", 0)
|
||||
|
||||
# Wait for ceph osd redistribution
|
||||
redistributed = self.wait_redistribution(osd, wait_length=0)
|
||||
if not redistributed:
|
||||
LOG.info(f"Ceph osd.{osd.get('id')} redistribution failed.")
|
||||
# Scaling OSD deployment to 1
|
||||
self.scale_deployment(f"osd={osd.get('id')}", 1)
|
||||
break
|
||||
|
||||
# Delete OSD deployment
|
||||
LOG.info(f"Deleting Ceph osd.{osd.get('id')} deployment")
|
||||
kube_apps.delete_namespaced_deployment(
|
||||
name=f"rook-ceph-osd-{osd.get('id')}",
|
||||
namespace=app_constants.HELM_NS_ROOK_CEPH,
|
||||
body=client.V1DeleteOptions(
|
||||
propagation_policy='Foreground',
|
||||
grace_period_seconds=0
|
||||
)
|
||||
)
|
||||
|
||||
# Purge OSD from ceph cluster
|
||||
cmd = f"ceph osd purge osd.{osd.get('id')} --yes-i-really-mean-it".split(" ")
|
||||
stdout, stderr = cutils.trycmd(*cmd)
|
||||
LOG.info(f"Purging osd.{osd.get('id')}. stout: {stdout}, stdeer: {stderr}")
|
||||
|
||||
# Delete OSD auth
|
||||
cmd = f"ceph auth del osd.{osd.get('id')}".split(" ")
|
||||
stdout, stderr = cutils.trycmd(*cmd)
|
||||
LOG.info(f"Removing auth of osd.{osd.get('id')}. stout: {stdout}, stdeer: {stderr}")
|
||||
|
||||
# Prepare disk (wipe disk)
|
||||
rpcapi = agent_rpcapi.AgentAPI()
|
||||
idisk = dbapi.idisk_get(osd.get('uuid'))
|
||||
rpcapi.disk_prepare(context, host.get('uuid'), idisk.as_dict(),
|
||||
True, False)
|
||||
|
||||
def get_osds_by_node(self, dbapi):
|
||||
ceph_rook_backend = app_utils.get_ceph_rook_backend(dbapi)
|
||||
hosts_by_deployment_model = self.get_hosts(
|
||||
dbapi, ceph_rook_backend.get('capabilities', {}))
|
||||
|
||||
nodes = []
|
||||
for host in hosts_by_deployment_model:
|
||||
new_node = {
|
||||
'name': host.hostname,
|
||||
'devices': []
|
||||
}
|
||||
istors = dbapi.istor_get_by_ihost(host.uuid)
|
||||
for stor in istors:
|
||||
if (stor.function == constants.STOR_FUNCTION_OSD and
|
||||
(stor.state == constants.SB_STATE_CONFIGURED or
|
||||
stor.state == constants.SB_STATE_CONFIGURING_WITH_APP)):
|
||||
idisk = dbapi.idisk_get(stor.idisk_uuid)
|
||||
new_node['devices'].append({
|
||||
'name': idisk.device_path
|
||||
})
|
||||
nodes.append(new_node)
|
||||
|
||||
return nodes
|
||||
|
||||
def pre_apply_raise_alarms(self, app_op):
|
||||
dbapi = app_op._dbapi
|
||||
fmapi = app_op._fm_api
|
||||
@@ -1118,8 +970,13 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
# CHECK, RAISE, and ALLOW: See if there is are missing OSDs
|
||||
replication_factor = self.get_data_replication_factor(ceph_rook_backend)
|
||||
host_with_osds_count, osds_count = self.get_osd_count(dbapi, ceph_rook_backend)
|
||||
deployment_model = ceph_rook_backend.get("capabilities", {}).get("deployment_model", "")
|
||||
|
||||
if cutils.is_aio_simplex_system(dbapi):
|
||||
# If the deployment model defined on storage backend is open
|
||||
# We do not raise alarms related to OSDs
|
||||
if deployment_model == app_constants.DEP_MODEL_OPEN:
|
||||
pass
|
||||
elif cutils.is_aio_simplex_system(dbapi):
|
||||
# Failure domain is OSDs
|
||||
if replication_factor > osds_count:
|
||||
self.handle_incomplete_config_alarm(
|
||||
@@ -1182,6 +1039,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
pass
|
||||
|
||||
def wait_for_mon_removal(self, mons_deleted, retries=180, delay=1):
|
||||
# TODO: Use has long running operations field on capabilities for monitors
|
||||
LOG.info("Waiting for monitors to be removed.")
|
||||
for mon in mons_deleted:
|
||||
while retries > 0:
|
||||
@@ -1346,6 +1204,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
new_mon_count = app_utils.count_monitor_assigned(dbapi)
|
||||
hosts = dbapi.ihost_get_list()
|
||||
|
||||
# TODO: Use has long running operations field on capabilities for monitors
|
||||
# Removes MON/MGR labels and deletes corresponding deployments
|
||||
# if the monitor function is absent in the ceph host-fs capabilities.
|
||||
# For the floating monitor, the removal process has been done on the
|
||||
@@ -1491,24 +1350,22 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
retries -= 1
|
||||
sleep(delay)
|
||||
|
||||
def update_osds(self, app_op, context):
|
||||
def remove_osds(self, app_op):
|
||||
dbapi = app_op._dbapi
|
||||
kube_apps = client.AppsV1Api()
|
||||
kube_custom = client.CustomObjectsApi()
|
||||
osd_deployments = kube_apps.list_namespaced_deployment(app_constants.HELM_NS_ROOK_CEPH,
|
||||
label_selector='app=rook-ceph-osd').items
|
||||
|
||||
custom_object = kube_custom.get_namespaced_custom_object(group="ceph.rook.io",
|
||||
version="v1",
|
||||
namespace=app_constants.HELM_NS_ROOK_CEPH,
|
||||
plural="cephclusters",
|
||||
name="rook-ceph")
|
||||
osds_to_remove = []
|
||||
istors = dbapi.istor_get_list(sort_key='osdid', sort_dir='asc')
|
||||
for istor in istors:
|
||||
if (istor.function == constants.STOR_FUNCTION_OSD and
|
||||
istor.state == constants.SB_STATE_DELETING_WITH_APP):
|
||||
disk = dbapi.idisk_get(istor.idisk_uuid)
|
||||
device_node = disk.device_node.replace("/dev/", "")
|
||||
istor_dict = istor.as_dict()
|
||||
istor_dict["hostname"] = dbapi.ihost_get(istor.forihostid).hostname
|
||||
istor_dict["idisk_node"] = disk.device_node
|
||||
istor_dict["target_name"] = device_node
|
||||
|
||||
# Get osds to be removed
|
||||
osds_to_remove = self.get_osds_to_remove(dbapi=dbapi,
|
||||
new_nodes=self.get_osds_by_node(dbapi),
|
||||
old_nodes=custom_object['spec']['storage']['nodes'],
|
||||
osd_deployments=osd_deployments)
|
||||
osds_to_remove.append(istor_dict)
|
||||
|
||||
if len(osds_to_remove) <= 0:
|
||||
LOG.info("No OSD to be removed")
|
||||
@@ -1516,53 +1373,172 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
|
||||
LOG.info("Removing OSDs from ceph cluster")
|
||||
|
||||
# Edit nodes from ceph cluster
|
||||
custom_object['spec']['storage']['nodes'] = self.get_osds_by_node(dbapi)
|
||||
kube_custom.patch_namespaced_custom_object(group="ceph.rook.io",
|
||||
version="v1",
|
||||
namespace=app_constants.HELM_NS_ROOK_CEPH,
|
||||
plural="cephclusters",
|
||||
name="rook-ceph",
|
||||
body=custom_object)
|
||||
# The osds will be removed by job
|
||||
self.create_job_to_remove_osds(osds_to_remove, False)
|
||||
|
||||
LOG.info('Waiting for Ceph Cluster Rook-Ceph in ready phase after node updated.')
|
||||
self.wait_for_cephcluster_phase(kube_custom, 'Progressing')
|
||||
correct_phase = self.wait_for_cephcluster_phase(kube_custom, 'Ready')
|
||||
if not correct_phase:
|
||||
LOG.info("Ceph Cluster Rook-Ceph is not in Ready phase! Canceling OSD removal...")
|
||||
return
|
||||
ceph_rook_backend = app_utils.get_ceph_rook_backend(dbapi)
|
||||
capabilities = ceph_rook_backend.get("capabilities")
|
||||
|
||||
# Scale rook-ceph-operator to 0
|
||||
self.scale_deployment("operator=rook", 0)
|
||||
# TODO: Use has long running operations field on capabilities for monitors
|
||||
capabilities["has_long_running_operations"] = True
|
||||
dbapi.storage_backend_update(ceph_rook_backend.uuid, {
|
||||
'capabilities': capabilities
|
||||
})
|
||||
|
||||
# Remove OSDs
|
||||
self.remove_osds(dbapi, osds_to_remove, kube_apps, context)
|
||||
self.handle_incomplete_config_alarm(
|
||||
app_op._fm_api, fm_constants.FM_ALARM_STATE_SET,
|
||||
app_constants.ALARM_TYPE_LONG_RUNNING_OPERATIONS_IN_PROGRESS)
|
||||
|
||||
# Scale rook-ceph-operator to 1
|
||||
self.scale_deployment("operator=rook", 1)
|
||||
LOG.info('Waiting for Ceph Cluster Rook-Ceph in ready phase after scale operator to 1.')
|
||||
self.wait_for_cephcluster_phase(kube_custom, 'Progressing')
|
||||
correct_phase = self.wait_for_cephcluster_phase(kube_custom, 'Ready')
|
||||
if correct_phase:
|
||||
LOG.info("Ceph Cluster Rook-Ceph is in Ready phase! The OSDs have been removed.")
|
||||
# Using -1s to ignore the timeout
|
||||
self.wait_jobs_by_label("app=ceph-remove-osds", "condition=complete", "-1s")
|
||||
|
||||
def wipe_all_osds(self, dbapi, context):
|
||||
# Checking ceph status
|
||||
is_ceph_responsive = True
|
||||
|
||||
cmd = "timeout 10 ceph status"
|
||||
stdout, stderr = cutils.trycmd(*cmd.split())
|
||||
|
||||
if stderr:
|
||||
is_ceph_responsive = False
|
||||
self.handle_incomplete_config_alarm(
|
||||
app_op._fm_api, fm_constants.FM_ALARM_STATE_SET,
|
||||
app_constants.ALARM_TYPE_REMOVE_OSDS_ABORTED)
|
||||
LOG.error("Error during OSD removal, Ceph is not responsive")
|
||||
|
||||
if is_ceph_responsive:
|
||||
self.handle_incomplete_config_alarm(
|
||||
app_op._fm_api, fm_constants.FM_ALARM_STATE_CLEAR,
|
||||
app_constants.ALARM_TYPE_REMOVE_OSDS_ABORTED)
|
||||
|
||||
for stor in osds_to_remove:
|
||||
|
||||
osd_id = stor['osdid']
|
||||
cmd = f"ceph osd find osd.{osd_id}".split(" ")
|
||||
stdout, stderr = cutils.trycmd(*cmd)
|
||||
|
||||
if not stderr:
|
||||
# The osd was found after removal, recover it.
|
||||
LOG.warn("The osd.{} was not deleted, recovered successfully".format(osd_id))
|
||||
dbapi.istor_update(stor["uuid"], {
|
||||
'state': constants.SB_STATE_CONFIGURED
|
||||
})
|
||||
else:
|
||||
LOG.info("Removing osd.{} from database".format(osd_id))
|
||||
|
||||
# Remove the stor from DB
|
||||
dbapi.istor_remove_disk_association(stor["uuid"])
|
||||
dbapi.istor_destroy(stor["uuid"])
|
||||
|
||||
# Remove from HostFS
|
||||
if len(dbapi.istor_get_all(stor["forihostid"])) == 0:
|
||||
fs = dbapi.host_fs_get_by_name_ihost(stor["forihostid"], constants.FILESYSTEM_NAME_CEPH)
|
||||
capabilities = fs.capabilities
|
||||
capabilities['functions'].remove(constants.FILESYSTEM_CEPH_FUNCTION_OSD)
|
||||
values = {'capabilities': capabilities}
|
||||
dbapi.host_fs_update(fs.uuid, values)
|
||||
|
||||
del capabilities["has_long_running_operations"]
|
||||
dbapi.storage_backend_update(ceph_rook_backend.uuid, {
|
||||
'capabilities': capabilities
|
||||
})
|
||||
|
||||
self.handle_incomplete_config_alarm(
|
||||
app_op._fm_api, fm_constants.FM_ALARM_STATE_CLEAR,
|
||||
app_constants.ALARM_TYPE_LONG_RUNNING_OPERATIONS_IN_PROGRESS)
|
||||
|
||||
def wipe_all_osds(self, dbapi):
|
||||
ceph_rook_backend = app_utils.get_ceph_rook_backend(dbapi)
|
||||
hosts_by_deployment_model = self.get_hosts(
|
||||
dbapi, ceph_rook_backend.get('capabilities', {}))
|
||||
has_disks_to_wipe = False
|
||||
|
||||
for host in hosts_by_deployment_model:
|
||||
istors = dbapi.istor_get_by_ihost(host.uuid)
|
||||
devices_to_wipe = []
|
||||
for stor in istors:
|
||||
if (stor.function == constants.STOR_FUNCTION_OSD and
|
||||
(stor.state == constants.SB_STATE_CONFIGURED or
|
||||
stor.state == constants.SB_STATE_CONFIGURING_WITH_APP)):
|
||||
# Prepare disk (wipe disk)
|
||||
rpcapi = agent_rpcapi.AgentAPI()
|
||||
idisk = dbapi.idisk_get(stor.get('idisk_uuid'))
|
||||
LOG.info("Wiping device %s from %s" % (idisk.get('device_node'), host.get('hostname')))
|
||||
rpcapi.disk_prepare(context, host.get('uuid'), idisk.as_dict(), True, False)
|
||||
sleep(5)
|
||||
devices_to_wipe.append(idisk.get('device_path'))
|
||||
has_disks_to_wipe = True
|
||||
|
||||
LOG.info("Preparing all OSDs from host %s to wipe" % host.hostname)
|
||||
|
||||
if len(devices_to_wipe) > 0:
|
||||
self.create_job_to_wipe_disks(host, devices_to_wipe)
|
||||
|
||||
if has_disks_to_wipe:
|
||||
self.wait_jobs_by_label("app=ceph-wipe-disks", "condition=complete")
|
||||
|
||||
def wait_jobs_by_label(self, label, for_arg, timeout_seconds="600s"):
|
||||
|
||||
cmd = 'kubectl --kubeconfig %s wait -n %s jobs -l %s --for=%s --timeout=%s' \
|
||||
% (kube_utils.KUBERNETES_ADMIN_CONF, app_constants.HELM_NS_ROOK_CEPH, label, for_arg, timeout_seconds)
|
||||
stdout, stderr = cutils.trycmd(*cmd.split())
|
||||
if not stderr or "no matching resources found" in stderr:
|
||||
return
|
||||
LOG.error("Error while waiting jobs for %s with label %s" % (for_arg, stderr))
|
||||
|
||||
def prepare_disks_to_apply(self, app_op):
|
||||
|
||||
dbapi = app_op._dbapi
|
||||
ceph_rook_backend = app_utils.get_ceph_rook_backend(dbapi)
|
||||
|
||||
hosts_by_deployment_model = self.get_hosts(
|
||||
dbapi, ceph_rook_backend.get('capabilities', {}))
|
||||
|
||||
LOG.info("Preparing disks...")
|
||||
|
||||
osds_to_remove = []
|
||||
has_disks_to_wipe = False
|
||||
|
||||
for host in hosts_by_deployment_model:
|
||||
|
||||
istors = dbapi.istor_get_by_ihost(host.uuid)
|
||||
devices_to_wipe = []
|
||||
|
||||
for stor in istors:
|
||||
if (stor.state == constants.SB_STATE_CONFIGURING_WITH_APP):
|
||||
|
||||
idisk = dbapi.idisk_get(stor.idisk_uuid)
|
||||
|
||||
device_node = idisk.device_node.replace("/dev/", "")
|
||||
cmd = f"ceph device ls-by-host {host.hostname}".split(" ")
|
||||
stdout, stderr = cutils.trycmd(*cmd)
|
||||
|
||||
# Checking if ceph is enabled and the OSD must be removed
|
||||
if not stderr and f" {device_node} " in stdout.strip():
|
||||
|
||||
# In case of a reinstall the OSD needs to be deleted and recreated
|
||||
stor_dict = stor.as_dict()
|
||||
stor_dict["hostname"] = host.hostname
|
||||
stor_dict["idisk_node"] = idisk.device_node
|
||||
stor_dict["target_name"] = device_node
|
||||
|
||||
osds_to_remove.append(stor_dict)
|
||||
continue
|
||||
|
||||
devices_to_wipe.append(idisk.device_path)
|
||||
has_disks_to_wipe = True
|
||||
|
||||
LOG.info("Preparing disks %s from host %s to apply app" % (devices_to_wipe, host.hostname))
|
||||
|
||||
if len(devices_to_wipe) > 0:
|
||||
self.create_job_to_wipe_disks(host, devices_to_wipe)
|
||||
|
||||
if has_disks_to_wipe:
|
||||
self.wait_jobs_by_label("app=ceph-wipe-disks", "condition=complete")
|
||||
|
||||
if len(osds_to_remove) > 0:
|
||||
# The operator needs to be down during this OSD recreation
|
||||
self.scale_deployment("operator=rook", 0, wait=True)
|
||||
|
||||
self.create_job_to_remove_osds(osds_to_remove, True)
|
||||
self.wait_jobs_by_label("app=ceph-remove-osds", "condition=complete")
|
||||
|
||||
self.scale_deployment("operator=rook", 1, wait=True)
|
||||
|
||||
def get_osd_count(self, dbapi, ceph_rook_backend):
|
||||
hosts_by_deployment_model = self.get_hosts(
|
||||
@@ -1810,6 +1786,7 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
)
|
||||
|
||||
def create_job_to_rm_mon_data(self, hostname, mon_name):
|
||||
# TODO: Use has long running operations field on capabilities for monitors
|
||||
LOG.info("Creating job to remove mon-%s data from %s" % (mon_name, hostname))
|
||||
remove_mon_job_template = self.get_rm_mon_data_job_template()
|
||||
remove_mon_job_resource = remove_mon_job_template.safe_substitute({'TARGET_HOSTNAME': hostname,
|
||||
@@ -1870,3 +1847,167 @@ class RookCephAppLifecycleOperator(base.AppLifecycleOperator):
|
||||
- mountPath: /var/lib/rook
|
||||
name: rook-data
|
||||
""")
|
||||
|
||||
def get_kube_resource(self, kube_client, name, namespace, resource_type):
|
||||
configmap = None
|
||||
read_method = getattr(kube_client, f"read_namespaced_{resource_type}")
|
||||
try:
|
||||
configmap = read_method(name=name, namespace=namespace)
|
||||
except Exception as err:
|
||||
if not isinstance(err, client.exceptions.ApiException) or err.status != 404:
|
||||
LOG.error("Exception raised when getting remove osds utils config map: %s" % err)
|
||||
|
||||
return configmap
|
||||
|
||||
def delete_kube_resource(self, kube_client, name, namespace, resource_type, timeout_seconds=600):
|
||||
watch = kube_watch()
|
||||
|
||||
delete_method = getattr(kube_client, f"delete_namespaced_{resource_type}")
|
||||
list_method = getattr(kube_client, f"list_namespaced_{resource_type}")
|
||||
|
||||
try:
|
||||
delete_method(
|
||||
name=name,
|
||||
namespace=namespace,
|
||||
body=client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=0)
|
||||
)
|
||||
with Timeout(timeout_seconds):
|
||||
for event in watch.stream(list_method, namespace=namespace):
|
||||
if event['object'].metadata.name == name and event['type'] == "DELETED":
|
||||
watch.stop()
|
||||
LOG.info("The %s %s was deleted succesfully." % (name, resource_type))
|
||||
return
|
||||
except Timeout:
|
||||
LOG.error("Timeout reached while waiting for %s %s to be deleted" % (name, resource_type))
|
||||
watch.stop()
|
||||
return
|
||||
except Exception as err:
|
||||
if not isinstance(err, client.exceptions.ApiException) or err.status != 404:
|
||||
LOG.error("Exception raised from deleting %s %s: %s" % (name, resource_type, err))
|
||||
return
|
||||
|
||||
def read_template(self, template_name):
|
||||
template_path = path.join(path.dirname(__file__), "templates", template_name)
|
||||
with open(template_path, "r") as file:
|
||||
return Template(file.read())
|
||||
|
||||
def create_kube_resource_file(self, path, value):
|
||||
output_file = open(path, 'w')
|
||||
output_file.write(value)
|
||||
output_file.close()
|
||||
|
||||
def delete_kube_resource_file(self, file_path):
|
||||
if path.exists(file_path):
|
||||
os_remove(file_path)
|
||||
|
||||
def create_job_to_remove_osds(self, osd_to_remove, is_reinstalling):
|
||||
|
||||
kube_batch = client.BatchV1Api()
|
||||
kube_client = client.ApiClient()
|
||||
kube_core = client.CoreV1Api()
|
||||
|
||||
cm_remove_osds_utils_template = self.read_template("cm-remove-osds-utils.yaml")
|
||||
job_remove_osds_template = self.read_template("job-remove-osds.yaml")
|
||||
job_wipe_disks_template = self.read_template("job-wipe-disks.yaml")
|
||||
|
||||
job_remove_osds_resource_path = '/tmp/job-remove-osds.yaml'
|
||||
cm_remove_osds_utils_resource_path = '/tmp/cm-remove-osds-utils.yaml'
|
||||
|
||||
# Creating yaml file using templates to create k8s resources
|
||||
remove_osds_resource = job_remove_osds_template.safe_substitute({'TARGET_OSDS_TO_REMOVE':
|
||||
repr(str(osd_to_remove)),
|
||||
'TARGET_IS_REINSTALLING':
|
||||
repr(str(is_reinstalling).lower())})
|
||||
|
||||
self.create_kube_resource_file(job_remove_osds_resource_path, remove_osds_resource)
|
||||
self.create_kube_resource_file(cm_remove_osds_utils_resource_path, cm_remove_osds_utils_template.template)
|
||||
|
||||
self.delete_kube_resource(kube_core, name="ceph-remove-osds-utils", namespace=app_constants.HELM_NS_ROOK_CEPH,
|
||||
resource_type="config_map")
|
||||
|
||||
# Creating config map with utils to remove OSDs
|
||||
try:
|
||||
utils.create_from_yaml(kube_client, cm_remove_osds_utils_resource_path)
|
||||
except Exception as err:
|
||||
LOG.error("Exception raised from creating remove osds utils config map: %s" % err)
|
||||
return
|
||||
|
||||
# Adding wipe disks job to the config map used by remove OSDs job
|
||||
patch_data = {
|
||||
"data": {
|
||||
"job-wipe-disks.yaml": job_wipe_disks_template.template
|
||||
}
|
||||
}
|
||||
try:
|
||||
kube_core.patch_namespaced_config_map(
|
||||
name="ceph-remove-osds-utils",
|
||||
namespace=app_constants.HELM_NS_ROOK_CEPH,
|
||||
body=patch_data,
|
||||
)
|
||||
LOG.info("The config map was updated to contain the wipe disks job")
|
||||
except Exception as err:
|
||||
if not isinstance(err, client.exceptions.ApiException) or err.status != 404:
|
||||
LOG.error("Exception raised from updating config map: %s" % err)
|
||||
return
|
||||
|
||||
# Deleting job if exists
|
||||
self.delete_kube_resource(kube_batch, name="ceph-remove-osds", namespace=app_constants.HELM_NS_ROOK_CEPH,
|
||||
resource_type="job")
|
||||
|
||||
# Creating job to remove OSDs
|
||||
try:
|
||||
utils.create_from_yaml(kube_client, job_remove_osds_resource_path)
|
||||
except Exception as err:
|
||||
LOG.error("Exception raised from creating remove osds job: %s" % err)
|
||||
return
|
||||
|
||||
# Deleting yaml files used to create job and config map
|
||||
self.delete_kube_resource_file(job_remove_osds_resource_path)
|
||||
self.delete_kube_resource_file(cm_remove_osds_utils_resource_path)
|
||||
|
||||
def create_job_to_wipe_disks(self, host, disks):
|
||||
|
||||
kube_batch = client.BatchV1Api()
|
||||
kube_client = client.ApiClient()
|
||||
kube_core = client.CoreV1Api()
|
||||
|
||||
hostname = host.hostname
|
||||
host_id = host.id
|
||||
|
||||
LOG.info("Wiping devices %s from host %s" % (disks, hostname))
|
||||
|
||||
job_wipe_disks_template = self.read_template("job-wipe-disks.yaml")
|
||||
sa_wipe_disks_template = self.read_template("sa-wipe-disks.yaml")
|
||||
|
||||
job_wipe_disks_resource_path = '/tmp/job-wipe-disks-{}-{}.yaml'.format(hostname, host_id)
|
||||
sa_wipe_disks_resource_path = '/tmp/sa-wipe-disks.yaml'
|
||||
|
||||
disks_str = ' '.join(disks)
|
||||
job_wipe_disks_resource = job_wipe_disks_template.safe_substitute({'TARGET_HOSTNAME': hostname,
|
||||
'TARGET_NAME': host_id,
|
||||
'TARGET_DISKS': disks_str})
|
||||
self.create_kube_resource_file(job_wipe_disks_resource_path, job_wipe_disks_resource)
|
||||
self.create_kube_resource_file(sa_wipe_disks_resource_path, sa_wipe_disks_template.template)
|
||||
|
||||
self.delete_kube_resource(kube_batch, name="ceph-wipe-disks-%s-%s" % (hostname, host_id),
|
||||
namespace=app_constants.HELM_NS_ROOK_CEPH, resource_type="job")
|
||||
|
||||
sa = self.get_kube_resource(kube_core, name="ceph-wipe-disks",
|
||||
namespace=app_constants.HELM_NS_ROOK_CEPH,
|
||||
resource_type="service_account")
|
||||
if not sa:
|
||||
try:
|
||||
LOG.info("Creating k8s resources from yaml %s." % sa_wipe_disks_resource_path)
|
||||
utils.create_from_yaml(kube_client, sa_wipe_disks_resource_path)
|
||||
except Exception as err:
|
||||
LOG.error("Exception raised from creating wipe disk service account: %s" % err)
|
||||
|
||||
try:
|
||||
LOG.info("Creating k8s resources from yaml %s." % job_wipe_disks_resource_path)
|
||||
utils.create_from_yaml(kube_client, job_wipe_disks_resource_path)
|
||||
except Exception as err:
|
||||
LOG.error("Exception raised from creating wipe disk job on host %s: %s" % (hostname, err))
|
||||
|
||||
# Deleting yaml files used to create job and config map
|
||||
self.delete_kube_resource_file(job_wipe_disks_resource_path)
|
||||
self.delete_kube_resource_file(sa_wipe_disks_resource_path)
|
||||
|
@@ -0,0 +1,233 @@
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: ceph-remove-osds-utils
|
||||
namespace: rook-ceph
|
||||
data:
|
||||
remove-osds-script.sh: |
|
||||
#!/bin/bash
|
||||
|
||||
apply_job() {
|
||||
job_path=${1}
|
||||
kubectl --kubeconfig /etc/kubernetes/admin.conf apply -f ${job_path}
|
||||
}
|
||||
|
||||
wait_redistribution() {
|
||||
|
||||
osdid=${1}
|
||||
|
||||
# We are using retries to check if the redistribution is not stuck
|
||||
# Checking if the ANY PGs has been redistributed.
|
||||
|
||||
delay=30s
|
||||
initial_retries_count=60
|
||||
retries=${initial_retries_count}
|
||||
|
||||
last_pgs_count=0
|
||||
|
||||
while (( retries > 0)); do
|
||||
|
||||
pgs_dump=$(try_ceph_cmd "ceph pg ls-by-osd osd.${osdid}")
|
||||
|
||||
if [[ $? -ne 0 ]]; then
|
||||
echo "Error when getting pg ls-by-osd: $pgs_dump"
|
||||
return 1
|
||||
fi
|
||||
|
||||
# Count the PGs from osd ignoring headers and footers
|
||||
pgs_count=$(echo "${pgs_dump}" | sed -n 's/^\([0-9a-f]\+\.[0-9a-f]\+\).*$/\1/p' | wc -l)
|
||||
|
||||
$(try_ceph_cmd "ceph osd safe-to-destroy osd.${osdid}")
|
||||
exit_code=$?
|
||||
if [[ "${pgs_count}" -eq 0 && "${exit_code}" -eq 0 ]]; then
|
||||
echo "Ceph osd.${osdid} has been redistributed."
|
||||
return 0
|
||||
fi
|
||||
|
||||
if [ "${pgs_count}" -eq "${last_pgs_count}" ]; then
|
||||
if [ ${retries} -eq 0 ]; then
|
||||
return 1
|
||||
fi
|
||||
retries=$((retries - 1))
|
||||
else
|
||||
# Resetting the retries because at least one PAGE was redistributed
|
||||
retries=${initial_retries_count}
|
||||
fi
|
||||
|
||||
echo "Ceph osd.${osdid} is in redistribution state, PGs count: ${pgs_count}"
|
||||
last_pgs_count=${pgs_count}
|
||||
sleep "${delay}"
|
||||
done
|
||||
}
|
||||
|
||||
wait_for_osd_down() {
|
||||
osdid=${1}
|
||||
delay=5s
|
||||
while true; do
|
||||
sleep "${delay}"
|
||||
|
||||
pgs_dump=$(try_ceph_cmd "ceph pg ls-by-osd osd.${osdid}")
|
||||
exit_code=$?
|
||||
|
||||
# exit code 11 = Error EAGAIN: osd 0 is not up
|
||||
if [ "${exit_code}" -eq 11 ]; then
|
||||
echo "Ceph osd.${osdid} is down."
|
||||
return 0
|
||||
fi
|
||||
|
||||
if [ "${exit_code}" -ne 0 ]; then
|
||||
echo "Error when getting pg ls-by-osd: $pgs_dump"
|
||||
return 1
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
recovery_osd_removal() {
|
||||
osdid=${1}
|
||||
kubectl --kubeconfig /etc/kubernetes/admin.conf scale deployment rook-ceph-osd-${osdid} -n rook-ceph --replicas=1
|
||||
try_ceph_cmd "ceph osd in osd.${osdid}"
|
||||
}
|
||||
|
||||
scale_deployment_by_label() {
|
||||
label=${1}
|
||||
replica=${2}
|
||||
kubectl --kubeconfig /etc/kubernetes/admin.conf scale deployment -n rook-ceph -l ${label} --replicas=${replica}
|
||||
}
|
||||
|
||||
check_conditions() {
|
||||
# TODO: Include more check conditions to execute before remove OSDs
|
||||
osdid=${1}
|
||||
|
||||
tree=$(timeout --preserve-status "30s" ceph osd tree)
|
||||
exit_code=$?
|
||||
if [ "${exit_code}" -ne 0 ]; then
|
||||
echo "The osd.${osdid} cannot be deleted, because the Ceph cluster is not responding. Aborting OSD removal."
|
||||
return 1
|
||||
fi
|
||||
|
||||
$(echo "${tree}" | grep -q "osd.${osdid}")
|
||||
exit_code=$?
|
||||
if [ "${exit_code}" -ne 0 ]; then
|
||||
echo "The osd.${osdid} cannot be deleted, because this OSD is not on ceph cluster."
|
||||
return 2
|
||||
fi
|
||||
|
||||
echo "The OSD removal operation will continue to delete osd.${osd_id}."
|
||||
return 0
|
||||
}
|
||||
|
||||
try_ceph_cmd() {
|
||||
ceph_command="$1"
|
||||
timeout=30
|
||||
retries=45
|
||||
delay=10
|
||||
|
||||
while (( retries > 0)); do
|
||||
result=$(timeout --preserve-status "${timeout}s" ${ceph_command})
|
||||
exit_code=$?
|
||||
if [ $exit_code -eq 124 ]; then
|
||||
retries=$((retries - 1))
|
||||
# Ceph command timed out. Recover your cluster to continue. Retrying in ${delay}s...
|
||||
sleep "${delay}s"
|
||||
else
|
||||
echo "${result}"
|
||||
return "${exit_code}"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# Formatting json to be used
|
||||
OSDS_TO_REMOVE=$(echo "$OSDS_TO_REMOVE" | sed "s/'/\"/g; s/datetime\.datetime([^)]*)/\"\"/g")
|
||||
OSDS_UUIDS_TO_REMOVE=$(echo "${OSDS_TO_REMOVE}" | jq -r '.[].uuid')
|
||||
|
||||
for osd_uuid in ${OSDS_UUIDS_TO_REMOVE}; do
|
||||
osd_id=$(echo "${OSDS_TO_REMOVE}" | jq -r ".[] | select(.uuid == \"${osd_uuid}\") | .osdid")
|
||||
target_name=$(echo "${OSDS_TO_REMOVE}" | jq -r ".[] | select(.uuid == \"${osd_uuid}\") | .target_name")
|
||||
osd_hostname=$(echo "${OSDS_TO_REMOVE}" | jq -r ".[] | select(.uuid == \"${osd_uuid}\") | .hostname")
|
||||
osd_idisk_uuid=$(echo "${OSDS_TO_REMOVE}" | jq -r ".[] | select(.uuid == \"${osd_uuid}\") | .idisk_uuid")
|
||||
osd_idisk_node=$(echo "${OSDS_TO_REMOVE}" | jq -r ".[] | select(.uuid == \"${osd_uuid}\") | .idisk_node")
|
||||
|
||||
echo "Removing osd.${osd_id} from host ${osd_hostname}"
|
||||
|
||||
res=$(check_conditions "${osd_id}")
|
||||
exit_code=$?
|
||||
echo "${res}"
|
||||
case ${exit_code} in
|
||||
0)
|
||||
# The OSD can be deleted.
|
||||
;;
|
||||
1)
|
||||
# The Ceph cluster is not responding.
|
||||
continue
|
||||
;;
|
||||
2)
|
||||
# The current OSD not exists on Ceph cluster
|
||||
continue
|
||||
;;
|
||||
*)
|
||||
echo "The osd.${osd_id} cannot be deleted, because a unknown issue happened."
|
||||
continue
|
||||
;;
|
||||
esac
|
||||
|
||||
try_ceph_cmd "ceph osd out osd.${osd_id}"
|
||||
|
||||
# Wait redistribution is ignored because this osd has already been wiped in host-reinstall action
|
||||
if [ "${IS_REINSTALLING}" = "false" ]; then
|
||||
wait_redistribution ${osd_id}
|
||||
if [ $? -eq 1 ]; then
|
||||
echo "The osd.${osd_id} FAILED to redistribute data"
|
||||
recovery_osd_removal ${osd_id}
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
|
||||
scale_deployment_by_label "osd=${osd_id}" 0
|
||||
|
||||
wait_for_osd_down ${osd_id}
|
||||
if [ $? -eq 1 ]; then
|
||||
echo "The osd.${osd_id} could not be in a down state"
|
||||
recovery_osd_removal ${osd_id}
|
||||
continue
|
||||
fi
|
||||
|
||||
res=$(check_conditions "${osd_id}")
|
||||
exit_code=$?
|
||||
echo "${res}"
|
||||
case ${exit_code} in
|
||||
0)
|
||||
# The OSD can be deleted.
|
||||
;;
|
||||
1)
|
||||
# The Ceph cluster is not responding.
|
||||
continue
|
||||
;;
|
||||
2)
|
||||
# The current OSD not exists on Ceph cluster
|
||||
continue
|
||||
;;
|
||||
*)
|
||||
echo "The osd.${osd_id} cannot be deleted, because a unknown issue happened."
|
||||
continue
|
||||
;;
|
||||
esac
|
||||
|
||||
kubectl --kubeconfig /etc/kubernetes/admin.conf delete deployment rook-ceph-osd-${osd_id} -n rook-ceph
|
||||
|
||||
try_ceph_cmd "ceph osd purge osd.${osd_id} --yes-i-really-mean-it"
|
||||
try_ceph_cmd "ceph auth del osd.${osd_id}"
|
||||
|
||||
# Delete job to guarantee that the job not exists
|
||||
kubectl --kubeconfig /etc/kubernetes/admin.conf delete job -n rook-ceph ceph-wipe-disks-${osd_hostname}-${target_name}
|
||||
|
||||
echo "Creating a job to wipe ${osd_idisk_node} disk"
|
||||
cp /jobs/job-wipe-disks.yaml /temp-dir/job-wipe-disks-${osd_uuid}.yaml
|
||||
sed -i "s|\$TARGET_HOSTNAME|${osd_hostname}|g" /temp-dir/job-wipe-disks-${osd_uuid}.yaml
|
||||
sed -i "s|\$TARGET_NAME|${target_name}|g" /temp-dir/job-wipe-disks-${osd_uuid}.yaml
|
||||
sed -i "s|\$TARGET_DISKS|${osd_idisk_node}|g" /temp-dir/job-wipe-disks-${osd_uuid}.yaml
|
||||
apply_job /temp-dir/job-wipe-disks-${osd_uuid}.yaml
|
||||
done
|
||||
|
||||
kubectl --kubeconfig /etc/kubernetes/admin.conf wait jobs --for=condition=complete -n rook-ceph -l app=ceph-wipe-disks
|
||||
exit 0
|
@@ -0,0 +1,83 @@
|
||||
---
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: ceph-remove-osds
|
||||
namespace: rook-ceph
|
||||
labels:
|
||||
app: ceph-remove-osds
|
||||
spec:
|
||||
ttlSecondsAfterFinished: 600
|
||||
template:
|
||||
metadata:
|
||||
namespace: rook-ceph
|
||||
spec:
|
||||
serviceAccountName: rook-ceph-provisioner
|
||||
nodeSelector: { node-role.kubernetes.io/control-plane: "" }
|
||||
tolerations:
|
||||
- effect: NoSchedule
|
||||
operator: Exists
|
||||
key: node-role.kubernetes.io/master
|
||||
- effect: NoSchedule
|
||||
operator: Exists
|
||||
key: node-role.kubernetes.io/control-plane
|
||||
restartPolicy: Never
|
||||
volumes:
|
||||
- name: ceph-remove-osds-script
|
||||
configMap:
|
||||
name: ceph-remove-osds-utils
|
||||
items:
|
||||
- key: remove-osds-script.sh
|
||||
path: remove-osds-script.sh
|
||||
- name: ceph-wipe-disks-job
|
||||
configMap:
|
||||
name: ceph-remove-osds-utils
|
||||
items:
|
||||
- key: job-wipe-disks.yaml
|
||||
path: job-wipe-disks.yaml
|
||||
- name: temp-dir
|
||||
emptyDir: {}
|
||||
- name: ceph-bin
|
||||
hostPath:
|
||||
path: /usr/local/bin/ceph
|
||||
type: File
|
||||
- name: rook-ceph-flag
|
||||
hostPath:
|
||||
path: /etc/platform/.node_rook_configured
|
||||
type: File
|
||||
- name: platform-conf
|
||||
hostPath:
|
||||
path: /etc/platform/platform.conf
|
||||
type: File
|
||||
- name: kube-conf
|
||||
hostPath:
|
||||
path: /etc/kubernetes/admin.conf
|
||||
type: File
|
||||
containers:
|
||||
- name: remove-osds
|
||||
image: registry.local:9001/docker.io/openstackhelm/ceph-config-helper:ubuntu_jammy_18.2.2-1-20240312
|
||||
command: [ "/bin/bash", "/scripts/remove-osds-script.sh" ]
|
||||
env:
|
||||
- name: OSDS_TO_REMOVE
|
||||
value: $TARGET_OSDS_TO_REMOVE
|
||||
- name: IS_REINSTALLING
|
||||
value: $TARGET_IS_REINSTALLING
|
||||
volumeMounts:
|
||||
- name: ceph-remove-osds-script
|
||||
mountPath: /scripts
|
||||
- name: ceph-wipe-disks-job
|
||||
mountPath: /jobs
|
||||
- name: temp-dir
|
||||
mountPath: /temp-dir
|
||||
- name: ceph-bin
|
||||
mountPath: /usr/local/bin/ceph
|
||||
readOnly: true
|
||||
- name: rook-ceph-flag
|
||||
mountPath: /etc/platform/.node_rook_configured
|
||||
readOnly: true
|
||||
- name: platform-conf
|
||||
mountPath: /etc/platform/platform.conf
|
||||
readOnly: true
|
||||
- name: kube-conf
|
||||
mountPath: /etc/kubernetes/admin.conf
|
||||
readOnly: true
|
@@ -0,0 +1,48 @@
|
||||
---
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: ceph-wipe-disks-$TARGET_HOSTNAME-$TARGET_NAME
|
||||
namespace: rook-ceph
|
||||
labels:
|
||||
app: ceph-wipe-disks
|
||||
spec:
|
||||
ttlSecondsAfterFinished: 60
|
||||
template:
|
||||
metadata:
|
||||
namespace: rook-ceph
|
||||
spec:
|
||||
serviceAccountName: ceph-wipe-disks
|
||||
nodeSelector:
|
||||
kubernetes.io/hostname: $TARGET_HOSTNAME
|
||||
tolerations:
|
||||
- effect: NoSchedule
|
||||
operator: Exists
|
||||
key: node-role.kubernetes.io/master
|
||||
- effect: NoSchedule
|
||||
operator: Exists
|
||||
key: node-role.kubernetes.io/control-plane
|
||||
restartPolicy: Never
|
||||
volumes:
|
||||
- hostPath:
|
||||
path: /dev
|
||||
type: ""
|
||||
name: device
|
||||
containers:
|
||||
- name: remove
|
||||
image: registry.local:9001/quay.io/ceph/ceph:v18.2.2
|
||||
command: ["/bin/bash"]
|
||||
args: [
|
||||
"-c",
|
||||
"for dev in $disks; do wipefs -a -f $dev; sgdisk --zap-all $dev; done"
|
||||
]
|
||||
env:
|
||||
- name: disks
|
||||
value: $TARGET_DISKS
|
||||
securityContext:
|
||||
privileged: true
|
||||
readOnlyRootFilesystem: false
|
||||
runAsUser: 0
|
||||
volumeMounts:
|
||||
- mountPath: /dev
|
||||
name: device
|
@@ -0,0 +1,8 @@
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ServiceAccount
|
||||
imagePullSecrets:
|
||||
- name: default-registry-key
|
||||
metadata:
|
||||
name: ceph-wipe-disks
|
||||
namespace: rook-ceph
|
@@ -1,5 +1,5 @@
|
||||
#
|
||||
# Copyright (c) 2024 Wind River Systems, Inc.
|
||||
# Copyright (c) 2024-2025 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
@@ -9,4 +9,8 @@ import setuptools
|
||||
|
||||
setuptools.setup(
|
||||
setup_requires=['pbr>=0.5'],
|
||||
pbr=True)
|
||||
pbr=True,
|
||||
include_package_data=True,
|
||||
package_data={
|
||||
'k8sapp_rook_ceph': ['lifecycle/templates/*.yaml']
|
||||
},)
|
||||
|
Reference in New Issue
Block a user