Merge "Add support for replacing a failed OSD drive"

This commit is contained in:
Jenkins 2016-03-21 15:15:32 +00:00 committed by Gerrit Code Review
commit f74a4a6df7
9 changed files with 316 additions and 5 deletions

1
.gitignore vendored
View File

@ -4,4 +4,5 @@
.testrepository
bin
*.sw[nop]
.idea
*.pyc

12
actions.yaml Normal file
View File

@ -0,0 +1,12 @@
replace-osd:
description: Replace a failed osd with a fresh disk
params:
osd-number:
type: integer
description: The osd number to operate on. Example 99. Hint you can get this information from `ceph osd tree`.
replacement-device:
type: string
description: The replacement device to use. Example /dev/sdb.
required: [osd-number, replacement-device]
additionalProperties: false

3
actions/__init__.py Normal file
View File

@ -0,0 +1,3 @@
__author__ = 'chris'
import sys
sys.path.append('hooks')

1
actions/replace-osd Symbolic link
View File

@ -0,0 +1 @@
replace_osd.py

84
actions/replace_osd.py Executable file
View File

@ -0,0 +1,84 @@
#!/usr/bin/python
from charmhelpers.core.hookenv import action_get, log, config, action_fail
__author__ = 'chris'
import os
import sys
sys.path.append('hooks')
import ceph
"""
Given a OSD number this script will attempt to turn that back into a mount
point and then replace the OSD with a new one.
"""
def get_disk_stats():
try:
# https://www.kernel.org/doc/Documentation/iostats.txt
with open('/proc/diskstats', 'r') as diskstats:
return diskstats.readlines()
except IOError as err:
log('Could not open /proc/diskstats. Error: {}'.format(err.message))
action_fail('replace-osd failed because /proc/diskstats could not '
'be opened {}'.format(err.message))
return None
def lookup_device_name(major_number, minor_number):
"""
:param major_number: int. The major device number
:param minor_number: int. The minor device number
:return: string. The name of the device. Example: /dev/sda.
Returns None on error.
"""
diskstats = get_disk_stats()
for line in diskstats:
parts = line.split()
if not len(parts) > 3:
# Skip bogus lines
continue
try:
if int(parts[0]) is major_number and int(parts[1]) is \
minor_number:
# Found our device. Return its name
return parts[2]
except ValueError as value_err:
log('Could not convert {} or {} into an integer. Error: {}'
.format(parts[0], parts[1], value_err.message))
continue
return None
def get_device_number(osd_number):
"""
This function will return a tuple of (major_number, minor_number)
device number for the given osd.
:param osd_number: int
:rtype : (major_number,minor_number)
"""
path = "/var/lib/ceph/osd/ceph-{}".format(osd_number)
info = os.lstat(path)
major_number = os.major(info.st_dev)
minor_number = os.minor(info.st_dev)
return major_number, minor_number
if __name__ == '__main__':
dead_osd_number = action_get("osd-number")
replacement_device = action_get("replacement-device")
major, minor = get_device_number(dead_osd_number)
device_name = lookup_device_name(major, minor)
osd_format = config('osd-format')
osd_journal = config('osd-journal')
ceph.replace_osd(dead_osd_number=dead_osd_number,
dead_osd_device="/dev/{}".format(device_name),
new_osd_device=replacement_device,
osd_format=osd_format,
osd_journal=osd_journal)

View File

@ -1,4 +1,3 @@
#
# Copyright 2012 Canonical Ltd.
#
@ -6,19 +5,24 @@
# James Page <james.page@canonical.com>
# Paul Collins <paul.collins@canonical.com>
#
import ctypes
import ctypes.util
import errno
import json
import subprocess
import time
import os
import re
import sys
import shutil
from charmhelpers.cli.host import mounts
from charmhelpers.core.host import (
mkdir,
chownr,
service_restart,
cmp_pkgrevno,
lsb_release
lsb_release,
service_stop
)
from charmhelpers.core.hookenv import (
log,
@ -64,7 +68,7 @@ def get_version():
pkg = cache[package]
except:
# the package is unknown to the current apt cache.
e = 'Could not determine version of package with no installation '\
e = 'Could not determine version of package with no installation ' \
'candidate: %s' % package
error_out(e)
@ -165,6 +169,7 @@ def add_bootstrap_hint(peer):
# Ignore any errors for this call
subprocess.call(cmd)
DISK_FORMATS = [
'xfs',
'ext4',
@ -178,6 +183,97 @@ CEPH_PARTITIONS = [
]
def umount(mount_point):
"""
This function unmounts a mounted directory forcibly. This will
be used for unmounting broken hard drive mounts which may hang.
If umount returns EBUSY this will lazy unmount.
:param mount_point: str. A String representing the filesystem mount point
:return: int. Returns 0 on success. errno otherwise.
"""
libc_path = ctypes.util.find_library("c")
libc = ctypes.CDLL(libc_path, use_errno=True)
# First try to umount with MNT_FORCE
ret = libc.umount(mount_point, 1)
if ret < 0:
err = ctypes.get_errno()
if err == errno.EBUSY:
# Detach from try. IE lazy umount
ret = libc.umount(mount_point, 2)
if ret < 0:
err = ctypes.get_errno()
return err
return 0
else:
return err
return 0
def replace_osd(dead_osd_number,
dead_osd_device,
new_osd_device,
osd_format,
osd_journal,
reformat_osd=False,
ignore_errors=False):
"""
This function will automate the replacement of a failed osd disk as much
as possible. It will revoke the keys for the old osd, remove it from the
crush map and then add a new osd into the cluster.
:param dead_osd_number: The osd number found in ceph osd tree. Example: 99
:param dead_osd_device: The physical device. Example: /dev/sda
:param osd_format:
:param osd_journal:
:param reformat_osd:
:param ignore_errors:
"""
host_mounts = mounts()
mount_point = None
for mount in host_mounts:
if mount[1] == dead_osd_device:
mount_point = mount[0]
# need to convert dev to osd number
# also need to get the mounted drive so we can tell the admin to
# replace it
try:
# Drop this osd out of the cluster. This will begin a
# rebalance operation
status_set('maintenance', 'Removing osd {}'.format(dead_osd_number))
subprocess.check_output(['ceph', 'osd', 'out',
'osd.{}'.format(dead_osd_number)])
# Kill the osd process if it's not already dead
if systemd():
service_stop('ceph-osd@{}'.format(dead_osd_number))
else:
subprocess.check_output(['stop', 'ceph-osd', 'id={}'.format(
dead_osd_number)]),
# umount if still mounted
ret = umount(mount_point)
if ret < 0:
raise RuntimeError('umount {} failed with error: {}'.format(
mount_point, os.strerror(ret)))
# Clean up the old mount point
shutil.rmtree(mount_point)
subprocess.check_output(['ceph', 'osd', 'crush', 'remove',
'osd.{}'.format(dead_osd_number)])
# Revoke the OSDs access keys
subprocess.check_output(['ceph', 'auth', 'del',
'osd.{}'.format(dead_osd_number)])
subprocess.check_output(['ceph', 'osd', 'rm',
'osd.{}'.format(dead_osd_number)])
status_set('maintenance', 'Setting up replacement osd {}'.format(
new_osd_device))
osdize(new_osd_device,
osd_format,
osd_journal,
reformat_osd,
ignore_errors)
except subprocess.CalledProcessError as e:
log('replace_osd failed with error: ' + e.output)
def is_osd_disk(dev):
try:
info = subprocess.check_output(['sgdisk', '-i', '1', dev])

View File

@ -18,7 +18,7 @@ deps = -r{toxinidir}/requirements.txt
basepython = python2.7
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = flake8 {posargs} hooks unit_tests tests
commands = flake8 {posargs} actions hooks unit_tests tests
charm proof
[testenv:venv]

View File

@ -1,2 +1,3 @@
import sys
sys.path.append('hooks')
sys.path.append('actions')

View File

@ -0,0 +1,113 @@
import errno
import posix
from mock import call, Mock, patch
import test_utils
import ceph
import replace_osd
TO_PATCH = [
'ctypes',
'status_set',
]
proc_data = [
' 8 0 sda 2291336 263100 108136080 1186276 28844343 28798167 '
'2145908072 49433216 0 7550032 50630100\n',
' 8 1 sda1 1379 1636 8314 692 75 17 1656 0 0 496 692\n',
' 8 2 sda2 1 0 2 0 0 0 0 0 0 0 0\n',
]
def umount_busy(*args):
# MNT_FORCE
if args[1] == 1:
return -1
# MNT_DETACH
if args[1] == 2:
return 0
class ReplaceOsdTestCase(test_utils.CharmTestCase):
def setUp(self):
super(ReplaceOsdTestCase, self).setUp(ceph, TO_PATCH)
def test_umount_ebusy(self):
self.ctypes.util.find_library.return_value = 'libc.so.6'
umount_mock = Mock()
self.ctypes.CDLL.return_value = umount_mock
umount_mock.umount.side_effect = umount_busy
self.ctypes.get_errno.return_value = errno.EBUSY
ret = ceph.umount('/some/osd/mount')
umount_mock.assert_has_calls([
call.umount('/some/osd/mount', 1),
call.umount('/some/osd/mount', 2),
])
assert ret == 0
def test_umount(self):
self.ctypes.util.find_library.return_value = 'libc.so.6'
umount_mock = Mock()
self.ctypes.CDLL.return_value = umount_mock
umount_mock.umount.return_value = 0
ret = ceph.umount('/some/osd/mount')
umount_mock.assert_has_calls([
call.umount('/some/osd/mount', 1),
])
assert ret == 0
@patch('ceph.mounts')
@patch('ceph.subprocess')
@patch('ceph.umount')
@patch('ceph.osdize')
@patch('ceph.shutil')
@patch('ceph.systemd')
def test_replace_osd(self,
systemd,
shutil,
osdize,
umount,
subprocess,
mounts):
mounts.return_value = [['/var/lib/ceph/osd/ceph-a', '/dev/sda']]
subprocess.check_output.return_value = True
self.status_set.return_value = None
systemd.return_value = False
umount.return_value = 0
osdize.return_value = None
shutil.rmtree.return_value = None
ceph.replace_osd(dead_osd_number=0,
dead_osd_device='/dev/sda',
new_osd_device='/dev/sdb',
osd_format=True,
osd_journal=None,
reformat_osd=False,
ignore_errors=False)
subprocess.check_output.assert_has_calls(
[
call(['ceph', 'osd', 'out', 'osd.0']),
call(['stop', 'ceph-osd', 'id=0']),
call(['ceph', 'osd', 'crush', 'remove', 'osd.0']),
call(['ceph', 'auth', 'del', 'osd.0']),
call(['ceph', 'osd', 'rm', 'osd.0'])
]
)
@patch('replace_osd.get_disk_stats')
def test_lookup_device_name(self, disk_stats):
disk_stats.return_value = proc_data
dev_name = replace_osd.lookup_device_name(major_number=8,
minor_number=0)
assert dev_name == 'sda', "dev_name: {}".format(dev_name)
@patch('replace_osd.os.lstat')
def test_get_device_number(self, lstat):
lstat.return_value = posix.stat_result([
16877, 16, 51729L, 3, 0, 0, 217, 0, 1458086872, 1458086872
])
major, minor = replace_osd.get_device_number(1)
assert major == 202
assert minor == 17