Rolling upgrades of ceph osd cluster
This change adds functionality to allow the ceph osd cluster to upgrade in a serial rolled fashion. This will use the ceph monitor cluster to lock and allows only 1 ceph osd server at a time to upgrade. The upgrade is initiated setting a config value for source for the service which will prompt the osd cluster to upgrade to that new source and restart all osds processes server by server. If an osd server has been waiting on a previous server for more than 10 minutes and hasn't seen it finish it will assume it died during the upgrade and proceed with its own upgrade. I had to modify the amulet test slightly to use the ceph-mon charm instead of the default ceph charm. I also changed the test so that it uses 3 ceph-osd servers instead of 1. Limtations of this patch: If the osd failure domain has been set to osd than this patch will cause brief temporary outages while osd processes are being restarted. Future work will handle this case. This reverts commit db09fdce938a7c2726beee643ddf0e9091966db3. Change-Id: Ied010278085611b6d552e050a9d2bfdad7f3d35d
This commit is contained in:
parent
a5e9bec199
commit
4285f14a55
@ -5,6 +5,7 @@ include:
|
||||
- cli
|
||||
- fetch
|
||||
- contrib.storage.linux:
|
||||
- ceph
|
||||
- utils
|
||||
- contrib.openstack.alternatives
|
||||
- contrib.network.ip
|
||||
|
133
hooks/ceph.py
133
hooks/ceph.py
@ -19,11 +19,10 @@ from charmhelpers.cli.host import mounts
|
||||
from charmhelpers.core.host import (
|
||||
mkdir,
|
||||
chownr,
|
||||
service_restart,
|
||||
cmp_pkgrevno,
|
||||
lsb_release,
|
||||
service_stop
|
||||
)
|
||||
service_stop,
|
||||
service_restart)
|
||||
from charmhelpers.core.hookenv import (
|
||||
log,
|
||||
ERROR,
|
||||
@ -58,6 +57,112 @@ def ceph_user():
|
||||
return "root"
|
||||
|
||||
|
||||
class CrushLocation(object):
|
||||
def __init__(self,
|
||||
name,
|
||||
identifier,
|
||||
host,
|
||||
rack,
|
||||
row,
|
||||
datacenter,
|
||||
chassis,
|
||||
root):
|
||||
self.name = name
|
||||
self.identifier = identifier
|
||||
self.host = host
|
||||
self.rack = rack
|
||||
self.row = row
|
||||
self.datacenter = datacenter
|
||||
self.chassis = chassis
|
||||
self.root = root
|
||||
|
||||
def __str__(self):
|
||||
return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \
|
||||
"chassis :{} root: {}".format(self.name, self.identifier,
|
||||
self.host, self.rack, self.row,
|
||||
self.datacenter, self.chassis,
|
||||
self.root)
|
||||
|
||||
def __eq__(self, other):
|
||||
return not self.name < other.name and not other.name < self.name
|
||||
|
||||
def __ne__(self, other):
|
||||
return self.name < other.name or other.name < self.name
|
||||
|
||||
def __gt__(self, other):
|
||||
return self.name > other.name
|
||||
|
||||
def __ge__(self, other):
|
||||
return not self.name < other.name
|
||||
|
||||
def __le__(self, other):
|
||||
return self.name < other.name
|
||||
|
||||
|
||||
def get_osd_tree(service):
|
||||
"""
|
||||
Returns the current osd map in JSON.
|
||||
:return: List. :raise: ValueError if the monmap fails to parse.
|
||||
Also raises CalledProcessError if our ceph command fails
|
||||
"""
|
||||
try:
|
||||
tree = subprocess.check_output(
|
||||
['ceph', '--id', service,
|
||||
'osd', 'tree', '--format=json'])
|
||||
try:
|
||||
json_tree = json.loads(tree)
|
||||
crush_list = []
|
||||
# Make sure children are present in the json
|
||||
if not json_tree['nodes']:
|
||||
return None
|
||||
child_ids = json_tree['nodes'][0]['children']
|
||||
for child in json_tree['nodes']:
|
||||
if child['id'] in child_ids:
|
||||
crush_list.append(
|
||||
CrushLocation(
|
||||
name=child.get('name'),
|
||||
identifier=child['id'],
|
||||
host=child.get('host'),
|
||||
rack=child.get('rack'),
|
||||
row=child.get('row'),
|
||||
datacenter=child.get('datacenter'),
|
||||
chassis=child.get('chassis'),
|
||||
root=child.get('root')
|
||||
)
|
||||
)
|
||||
return crush_list
|
||||
except ValueError as v:
|
||||
log("Unable to parse ceph tree json: {}. Error: {}".format(
|
||||
tree, v.message))
|
||||
raise
|
||||
except subprocess.CalledProcessError as e:
|
||||
log("ceph osd tree command failed with message: {}".format(
|
||||
e.message))
|
||||
raise
|
||||
|
||||
|
||||
def get_local_osd_ids():
|
||||
"""
|
||||
This will list the /var/lib/ceph/osd/* directories and try
|
||||
to split the ID off of the directory name and return it in
|
||||
a list
|
||||
|
||||
:return: list. A list of osd identifiers :raise: OSError if
|
||||
something goes wrong with listing the directory.
|
||||
"""
|
||||
osd_ids = []
|
||||
osd_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'osd')
|
||||
if os.path.exists(osd_path):
|
||||
try:
|
||||
dirs = os.listdir(osd_path)
|
||||
for osd_dir in dirs:
|
||||
osd_id = osd_dir.split('-')[1]
|
||||
osd_ids.append(osd_id)
|
||||
except OSError:
|
||||
raise
|
||||
return osd_ids
|
||||
|
||||
|
||||
def get_version():
|
||||
'''Derive Ceph release from an installed package.'''
|
||||
import apt_pkg as apt
|
||||
@ -308,6 +413,7 @@ def rescan_osd_devices():
|
||||
|
||||
|
||||
_bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
|
||||
_upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring"
|
||||
|
||||
|
||||
def is_bootstrapped():
|
||||
@ -333,6 +439,21 @@ def import_osd_bootstrap_key(key):
|
||||
]
|
||||
subprocess.check_call(cmd)
|
||||
|
||||
|
||||
def import_osd_upgrade_key(key):
|
||||
if not os.path.exists(_upgrade_keyring):
|
||||
cmd = [
|
||||
"sudo",
|
||||
"-u",
|
||||
ceph_user(),
|
||||
'ceph-authtool',
|
||||
_upgrade_keyring,
|
||||
'--create-keyring',
|
||||
'--name=client.osd-upgrade',
|
||||
'--add-key={}'.format(key)
|
||||
]
|
||||
subprocess.check_call(cmd)
|
||||
|
||||
# OSD caps taken from ceph-create-keys
|
||||
_osd_bootstrap_caps = {
|
||||
'mon': [
|
||||
@ -499,7 +620,7 @@ def update_monfs():
|
||||
|
||||
|
||||
def maybe_zap_journal(journal_dev):
|
||||
if (is_osd_disk(journal_dev)):
|
||||
if is_osd_disk(journal_dev):
|
||||
log('Looks like {} is already an OSD data'
|
||||
' or journal, skipping.'.format(journal_dev))
|
||||
return
|
||||
@ -543,7 +664,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
|
||||
log('Path {} is not a block device - bailing'.format(dev))
|
||||
return
|
||||
|
||||
if (is_osd_disk(dev) and not reformat_osd):
|
||||
if is_osd_disk(dev) and not reformat_osd:
|
||||
log('Looks like {} is already an'
|
||||
' OSD data or journal, skipping.'.format(dev))
|
||||
return
|
||||
@ -617,7 +738,7 @@ def filesystem_mounted(fs):
|
||||
|
||||
|
||||
def get_running_osds():
|
||||
'''Returns a list of the pids of the current running OSD daemons'''
|
||||
"""Returns a list of the pids of the current running OSD daemons"""
|
||||
cmd = ['pgrep', 'ceph-osd']
|
||||
try:
|
||||
result = subprocess.check_output(cmd)
|
||||
|
@ -9,12 +9,16 @@
|
||||
|
||||
import glob
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import socket
|
||||
import time
|
||||
|
||||
import ceph
|
||||
from charmhelpers.core import hookenv
|
||||
from charmhelpers.core.hookenv import (
|
||||
log,
|
||||
ERROR,
|
||||
@ -31,8 +35,8 @@ from charmhelpers.core.hookenv import (
|
||||
from charmhelpers.core.host import (
|
||||
umount,
|
||||
mkdir,
|
||||
cmp_pkgrevno
|
||||
)
|
||||
cmp_pkgrevno,
|
||||
service_stop, service_start)
|
||||
from charmhelpers.fetch import (
|
||||
add_source,
|
||||
apt_install,
|
||||
@ -40,25 +44,217 @@ from charmhelpers.fetch import (
|
||||
filter_installed_packages,
|
||||
)
|
||||
from charmhelpers.core.sysctl import create as create_sysctl
|
||||
from charmhelpers.core import host
|
||||
|
||||
from utils import (
|
||||
get_host_ip,
|
||||
get_networks,
|
||||
assert_charm_supports_ipv6,
|
||||
render_template,
|
||||
)
|
||||
render_template)
|
||||
|
||||
from charmhelpers.contrib.openstack.alternatives import install_alternative
|
||||
from charmhelpers.contrib.network.ip import (
|
||||
get_ipv6_addr,
|
||||
format_ipv6_addr,
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.storage.linux.ceph import (
|
||||
monitor_key_set,
|
||||
monitor_key_exists,
|
||||
monitor_key_get)
|
||||
from charmhelpers.contrib.charmsupport import nrpe
|
||||
from charmhelpers.contrib.hardening.harden import harden
|
||||
|
||||
hooks = Hooks()
|
||||
|
||||
# A dict of valid ceph upgrade paths. Mapping is old -> new
|
||||
upgrade_paths = {
|
||||
'cloud:trusty-juno': 'cloud:trusty-kilo',
|
||||
'cloud:trusty-kilo': 'cloud:trusty-liberty',
|
||||
'cloud:trusty-liberty': 'cloud:trusty-mitaka',
|
||||
}
|
||||
|
||||
|
||||
def pretty_print_upgrade_paths():
|
||||
lines = []
|
||||
for key, value in upgrade_paths.iteritems():
|
||||
lines.append("{} -> {}".format(key, value))
|
||||
return lines
|
||||
|
||||
|
||||
def check_for_upgrade():
|
||||
release_info = host.lsb_release()
|
||||
if not release_info['DISTRIB_CODENAME'] == 'trusty':
|
||||
log("Invalid upgrade path from {}. Only trusty is currently "
|
||||
"supported".format(release_info['DISTRIB_CODENAME']))
|
||||
return
|
||||
|
||||
c = hookenv.config()
|
||||
old_version = c.previous('source')
|
||||
log('old_version: {}'.format(old_version))
|
||||
# Strip all whitespace
|
||||
new_version = hookenv.config('source')
|
||||
if new_version:
|
||||
# replace all whitespace
|
||||
new_version = new_version.replace(' ', '')
|
||||
log('new_version: {}'.format(new_version))
|
||||
|
||||
if old_version in upgrade_paths:
|
||||
if new_version == upgrade_paths[old_version]:
|
||||
log("{} to {} is a valid upgrade path. Proceeding.".format(
|
||||
old_version, new_version))
|
||||
roll_osd_cluster(new_version)
|
||||
else:
|
||||
# Log a helpful error message
|
||||
log("Invalid upgrade path from {} to {}. "
|
||||
"Valid paths are: {}".format(old_version,
|
||||
new_version,
|
||||
pretty_print_upgrade_paths()))
|
||||
|
||||
|
||||
def lock_and_roll(my_name):
|
||||
start_timestamp = time.time()
|
||||
|
||||
log('monitor_key_set {}_start {}'.format(my_name, start_timestamp))
|
||||
monitor_key_set('osd-upgrade', "{}_start".format(my_name), start_timestamp)
|
||||
log("Rolling")
|
||||
# This should be quick
|
||||
upgrade_osd()
|
||||
log("Done")
|
||||
|
||||
stop_timestamp = time.time()
|
||||
# Set a key to inform others I am finished
|
||||
log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp))
|
||||
monitor_key_set('osd-upgrade', "{}_done".format(my_name), stop_timestamp)
|
||||
|
||||
|
||||
def wait_on_previous_node(previous_node):
|
||||
log("Previous node is: {}".format(previous_node))
|
||||
|
||||
previous_node_finished = monitor_key_exists(
|
||||
'osd-upgrade',
|
||||
"{}_done".format(previous_node))
|
||||
|
||||
while previous_node_finished is False:
|
||||
log("{} is not finished. Waiting".format(previous_node))
|
||||
# Has this node been trying to upgrade for longer than
|
||||
# 10 minutes?
|
||||
# If so then move on and consider that node dead.
|
||||
|
||||
# NOTE: This assumes the clusters clocks are somewhat accurate
|
||||
# If the hosts clock is really far off it may cause it to skip
|
||||
# the previous node even though it shouldn't.
|
||||
current_timestamp = time.time()
|
||||
previous_node_start_time = monitor_key_get(
|
||||
'osd-upgrade',
|
||||
"{}_start".format(previous_node))
|
||||
if (current_timestamp - (10 * 60)) > previous_node_start_time:
|
||||
# Previous node is probably dead. Lets move on
|
||||
if previous_node_start_time is not None:
|
||||
log(
|
||||
"Waited 10 mins on node {}. current time: {} > "
|
||||
"previous node start time: {} Moving on".format(
|
||||
previous_node,
|
||||
(current_timestamp - (10 * 60)),
|
||||
previous_node_start_time))
|
||||
return
|
||||
else:
|
||||
# I have to wait. Sleep a random amount of time and then
|
||||
# check if I can lock,upgrade and roll.
|
||||
wait_time = random.randrange(5, 30)
|
||||
log('waiting for {} seconds'.format(wait_time))
|
||||
time.sleep(wait_time)
|
||||
previous_node_finished = monitor_key_exists(
|
||||
'osd-upgrade',
|
||||
"{}_done".format(previous_node))
|
||||
|
||||
|
||||
def get_upgrade_position(osd_sorted_list, match_name):
|
||||
for index, item in enumerate(osd_sorted_list):
|
||||
if item.name == match_name:
|
||||
return index
|
||||
return None
|
||||
|
||||
|
||||
# Edge cases:
|
||||
# 1. Previous node dies on upgrade, can we retry?
|
||||
# 2. This assumes that the osd failure domain is not set to osd.
|
||||
# It rolls an entire server at a time.
|
||||
def roll_osd_cluster(new_version):
|
||||
"""
|
||||
This is tricky to get right so here's what we're going to do.
|
||||
There's 2 possible cases: Either I'm first in line or not.
|
||||
If I'm not first in line I'll wait a random time between 5-30 seconds
|
||||
and test to see if the previous osd is upgraded yet.
|
||||
|
||||
TODO: If you're not in the same failure domain it's safe to upgrade
|
||||
1. Examine all pools and adopt the most strict failure domain policy
|
||||
Example: Pool 1: Failure domain = rack
|
||||
Pool 2: Failure domain = host
|
||||
Pool 3: Failure domain = row
|
||||
|
||||
outcome: Failure domain = host
|
||||
"""
|
||||
log('roll_osd_cluster called with {}'.format(new_version))
|
||||
my_name = socket.gethostname()
|
||||
osd_tree = ceph.get_osd_tree(service='osd-upgrade')
|
||||
# A sorted list of osd unit names
|
||||
osd_sorted_list = sorted(osd_tree)
|
||||
log("osd_sorted_list: {}".format(osd_sorted_list))
|
||||
|
||||
try:
|
||||
position = get_upgrade_position(osd_sorted_list, my_name)
|
||||
log("upgrade position: {}".format(position))
|
||||
if position == 0:
|
||||
# I'm first! Roll
|
||||
# First set a key to inform others I'm about to roll
|
||||
lock_and_roll(my_name=my_name)
|
||||
else:
|
||||
# Check if the previous node has finished
|
||||
status_set('blocked',
|
||||
'Waiting on {} to finish upgrading'.format(
|
||||
osd_sorted_list[position - 1].name))
|
||||
wait_on_previous_node(
|
||||
previous_node=osd_sorted_list[position - 1].name)
|
||||
lock_and_roll(my_name=my_name)
|
||||
except ValueError:
|
||||
log("Failed to find name {} in list {}".format(
|
||||
my_name, osd_sorted_list))
|
||||
status_set('blocked', 'failed to upgrade osd')
|
||||
|
||||
|
||||
def upgrade_osd():
|
||||
current_version = ceph.get_version()
|
||||
status_set("maintenance", "Upgrading osd")
|
||||
log("Current ceph version is {}".format(current_version))
|
||||
new_version = config('release-version')
|
||||
log("Upgrading to: {}".format(new_version))
|
||||
|
||||
try:
|
||||
add_source(config('source'), config('key'))
|
||||
apt_update(fatal=True)
|
||||
except subprocess.CalledProcessError as err:
|
||||
log("Adding the ceph source failed with message: {}".format(
|
||||
err.message))
|
||||
status_set("blocked", "Upgrade to {} failed".format(new_version))
|
||||
sys.exit(1)
|
||||
try:
|
||||
if ceph.systemd():
|
||||
for osd_id in ceph.get_local_osd_ids():
|
||||
service_stop('ceph-osd@{}'.format(osd_id))
|
||||
else:
|
||||
service_stop('ceph-osd-all')
|
||||
apt_install(packages=ceph.PACKAGES, fatal=True)
|
||||
if ceph.systemd():
|
||||
for osd_id in ceph.get_local_osd_ids():
|
||||
service_start('ceph-osd@{}'.format(osd_id))
|
||||
else:
|
||||
service_start('ceph-osd-all')
|
||||
except subprocess.CalledProcessError as err:
|
||||
log("Stopping ceph and upgrading packages failed "
|
||||
"with message: {}".format(err.message))
|
||||
status_set("blocked", "Upgrade to {} failed".format(new_version))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def install_upstart_scripts():
|
||||
# Only install upstart configurations for older versions
|
||||
@ -126,6 +322,7 @@ def emit_cephconf():
|
||||
install_alternative('ceph.conf', '/etc/ceph/ceph.conf',
|
||||
charm_ceph_conf, 90)
|
||||
|
||||
|
||||
JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped'
|
||||
|
||||
|
||||
@ -161,6 +358,9 @@ def check_overlap(journaldevs, datadevs):
|
||||
@hooks.hook('config-changed')
|
||||
@harden()
|
||||
def config_changed():
|
||||
# Check if an upgrade was requested
|
||||
check_for_upgrade()
|
||||
|
||||
# Pre-flight checks
|
||||
if config('osd-format') not in ceph.DISK_FORMATS:
|
||||
log('Invalid OSD disk format configuration specified', level=ERROR)
|
||||
@ -174,7 +374,7 @@ def config_changed():
|
||||
create_sysctl(sysctl_dict, '/etc/sysctl.d/50-ceph-osd-charm.conf')
|
||||
|
||||
e_mountpoint = config('ephemeral-unmount')
|
||||
if (e_mountpoint and ceph.filesystem_mounted(e_mountpoint)):
|
||||
if e_mountpoint and ceph.filesystem_mounted(e_mountpoint):
|
||||
umount(e_mountpoint)
|
||||
prepare_disks_and_activate()
|
||||
|
||||
@ -204,8 +404,14 @@ def get_mon_hosts():
|
||||
hosts = []
|
||||
for relid in relation_ids('mon'):
|
||||
for unit in related_units(relid):
|
||||
addr = relation_get('ceph-public-address', unit, relid) or \
|
||||
get_host_ip(relation_get('private-address', unit, relid))
|
||||
addr = \
|
||||
relation_get('ceph-public-address',
|
||||
unit,
|
||||
relid) or get_host_ip(
|
||||
relation_get(
|
||||
'private-address',
|
||||
unit,
|
||||
relid))
|
||||
|
||||
if addr:
|
||||
hosts.append('{}:6789'.format(format_ipv6_addr(addr) or addr))
|
||||
@ -261,10 +467,12 @@ def get_journal_devices():
|
||||
'mon-relation-departed')
|
||||
def mon_relation():
|
||||
bootstrap_key = relation_get('osd_bootstrap_key')
|
||||
upgrade_key = relation_get('osd_upgrade_key')
|
||||
if get_fsid() and get_auth() and bootstrap_key:
|
||||
log('mon has provided conf- scanning disks')
|
||||
emit_cephconf()
|
||||
ceph.import_osd_bootstrap_key(bootstrap_key)
|
||||
ceph.import_osd_upgrade_key(upgrade_key)
|
||||
prepare_disks_and_activate()
|
||||
else:
|
||||
log('mon cluster has not yet provided conf')
|
||||
|
1195
hooks/charmhelpers/contrib/storage/linux/ceph.py
Normal file
1195
hooks/charmhelpers/contrib/storage/linux/ceph.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -33,6 +33,8 @@ cluster addr = {{ cluster_addr }}
|
||||
osd crush location = {{crush_location}}
|
||||
{% endif %}
|
||||
|
||||
[client.osd-upgrade]
|
||||
keyring = /var/lib/ceph/osd/ceph.client.osd-upgrade.keyring
|
||||
|
||||
[mon]
|
||||
keyring = /var/lib/ceph/mon/$cluster-$id/keyring
|
||||
|
@ -43,8 +43,8 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
and the rest of the service are from lp branches that are
|
||||
compatible with the local charm (e.g. stable or next).
|
||||
"""
|
||||
this_service = {'name': 'ceph-osd'}
|
||||
other_services = [{'name': 'ceph', 'units': 3},
|
||||
this_service = {'name': 'ceph-osd', 'units': 3}
|
||||
other_services = [{'name': 'ceph-mon', 'units': 3},
|
||||
{'name': 'mysql'},
|
||||
{'name': 'keystone'},
|
||||
{'name': 'rabbitmq-server'},
|
||||
@ -60,18 +60,18 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
'nova-compute:shared-db': 'mysql:shared-db',
|
||||
'nova-compute:amqp': 'rabbitmq-server:amqp',
|
||||
'nova-compute:image-service': 'glance:image-service',
|
||||
'nova-compute:ceph': 'ceph:client',
|
||||
'nova-compute:ceph': 'ceph-mon:client',
|
||||
'keystone:shared-db': 'mysql:shared-db',
|
||||
'glance:shared-db': 'mysql:shared-db',
|
||||
'glance:identity-service': 'keystone:identity-service',
|
||||
'glance:amqp': 'rabbitmq-server:amqp',
|
||||
'glance:ceph': 'ceph:client',
|
||||
'glance:ceph': 'ceph-mon:client',
|
||||
'cinder:shared-db': 'mysql:shared-db',
|
||||
'cinder:identity-service': 'keystone:identity-service',
|
||||
'cinder:amqp': 'rabbitmq-server:amqp',
|
||||
'cinder:image-service': 'glance:image-service',
|
||||
'cinder:ceph': 'ceph:client',
|
||||
'ceph-osd:mon': 'ceph:osd'
|
||||
'cinder:ceph': 'ceph-mon:client',
|
||||
'ceph-osd:mon': 'ceph-mon:osd'
|
||||
}
|
||||
super(CephOsdBasicDeployment, self)._add_relations(relations)
|
||||
|
||||
@ -86,9 +86,6 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
'auth-supported': 'none',
|
||||
'fsid': '6547bd3e-1397-11e2-82e5-53567c8d32dc',
|
||||
'monitor-secret': 'AQCXrnZQwI7KGBAAiPofmKEXKxu5bUzoYLVkbQ==',
|
||||
'osd-reformat': 'yes',
|
||||
'ephemeral-unmount': '/mnt',
|
||||
'osd-devices': '/dev/vdb /srv/ceph'
|
||||
}
|
||||
|
||||
# Include a non-existent device as osd-devices is a whitelist,
|
||||
@ -102,7 +99,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
configs = {'keystone': keystone_config,
|
||||
'mysql': mysql_config,
|
||||
'cinder': cinder_config,
|
||||
'ceph': ceph_config,
|
||||
'ceph-mon': ceph_config,
|
||||
'ceph-osd': ceph_osd_config}
|
||||
super(CephOsdBasicDeployment, self)._configure_services(configs)
|
||||
|
||||
@ -115,10 +112,12 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
self.nova_sentry = self.d.sentry.unit['nova-compute/0']
|
||||
self.glance_sentry = self.d.sentry.unit['glance/0']
|
||||
self.cinder_sentry = self.d.sentry.unit['cinder/0']
|
||||
self.ceph0_sentry = self.d.sentry.unit['ceph/0']
|
||||
self.ceph1_sentry = self.d.sentry.unit['ceph/1']
|
||||
self.ceph2_sentry = self.d.sentry.unit['ceph/2']
|
||||
self.ceph0_sentry = self.d.sentry.unit['ceph-mon/0']
|
||||
self.ceph1_sentry = self.d.sentry.unit['ceph-mon/1']
|
||||
self.ceph2_sentry = self.d.sentry.unit['ceph-mon/2']
|
||||
self.ceph_osd_sentry = self.d.sentry.unit['ceph-osd/0']
|
||||
self.ceph_osd1_sentry = self.d.sentry.unit['ceph-osd/1']
|
||||
self.ceph_osd2_sentry = self.d.sentry.unit['ceph-osd/2']
|
||||
u.log.debug('openstack release val: {}'.format(
|
||||
self._get_openstack_release()))
|
||||
u.log.debug('openstack release str: {}'.format(
|
||||
@ -177,7 +176,6 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
# Process name and quantity of processes to expect on each unit
|
||||
ceph_processes = {
|
||||
'ceph-mon': 1,
|
||||
'ceph-osd': 2
|
||||
}
|
||||
|
||||
# Units with process names and PID quantities expected
|
||||
@ -214,9 +212,6 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
ceph_services = [
|
||||
'ceph-mon-all',
|
||||
'ceph-mon id=`hostname`',
|
||||
'ceph-osd-all',
|
||||
'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(0)),
|
||||
'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(1))
|
||||
]
|
||||
services[self.ceph0_sentry] = ceph_services
|
||||
services[self.ceph1_sentry] = ceph_services
|
||||
@ -233,16 +228,16 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
|
||||
|
||||
def test_200_ceph_osd_ceph_relation(self):
|
||||
"""Verify the ceph-osd to ceph relation data."""
|
||||
u.log.debug('Checking ceph-osd:ceph mon relation data...')
|
||||
u.log.debug('Checking ceph-osd:ceph-mon relation data...')
|
||||
unit = self.ceph_osd_sentry
|
||||
relation = ['mon', 'ceph:osd']
|
||||
relation = ['mon', 'ceph-mon:osd']
|
||||
expected = {
|
||||
'private-address': u.valid_ip
|
||||
}
|
||||
|
||||
ret = u.validate_relation_data(unit, relation, expected)
|
||||
if ret:
|
||||
message = u.relation_error('ceph-osd to ceph', ret)
|
||||
message = u.relation_error('ceph-osd to ceph-mon', ret)
|
||||
amulet.raise_status(amulet.FAIL, msg=message)
|
||||
|
||||
def test_201_ceph0_to_ceph_osd_relation(self):
|
||||
|
157
unit_tests/test_upgrade_roll.py
Normal file
157
unit_tests/test_upgrade_roll.py
Normal file
@ -0,0 +1,157 @@
|
||||
import time
|
||||
|
||||
__author__ = 'chris'
|
||||
from mock import patch, call, MagicMock
|
||||
import sys
|
||||
|
||||
sys.path.append('/home/chris/repos/ceph-osd/hooks')
|
||||
|
||||
from ceph import CrushLocation
|
||||
|
||||
import test_utils
|
||||
import ceph_hooks
|
||||
|
||||
TO_PATCH = [
|
||||
'apt_install',
|
||||
'apt_update',
|
||||
'add_source',
|
||||
'config',
|
||||
'ceph',
|
||||
'get_conf',
|
||||
'hookenv',
|
||||
'host',
|
||||
'log',
|
||||
'service_start',
|
||||
'service_stop',
|
||||
'socket',
|
||||
'status_set',
|
||||
]
|
||||
|
||||
|
||||
def config_side_effect(*args):
|
||||
if args[0] == 'source':
|
||||
return 'cloud:trusty-kilo'
|
||||
elif args[0] == 'key':
|
||||
return 'key'
|
||||
elif args[0] == 'release-version':
|
||||
return 'cloud:trusty-kilo'
|
||||
|
||||
|
||||
previous_node_start_time = time.time() - (9 * 60)
|
||||
|
||||
|
||||
def monitor_key_side_effect(*args):
|
||||
if args[1] == \
|
||||
'ip-192-168-1-2_done':
|
||||
return False
|
||||
elif args[1] == \
|
||||
'ip-192-168-1-2_start':
|
||||
# Return that the previous node started 9 minutes ago
|
||||
return previous_node_start_time
|
||||
|
||||
|
||||
class UpgradeRollingTestCase(test_utils.CharmTestCase):
|
||||
def setUp(self):
|
||||
super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH)
|
||||
|
||||
@patch('ceph_hooks.roll_osd_cluster')
|
||||
def test_check_for_upgrade(self, roll_osd_cluster):
|
||||
self.host.lsb_release.return_value = {
|
||||
'DISTRIB_CODENAME': 'trusty',
|
||||
}
|
||||
previous_mock = MagicMock().return_value
|
||||
previous_mock.previous.return_value = "cloud:trusty-juno"
|
||||
self.hookenv.config.side_effect = [previous_mock,
|
||||
config_side_effect('source')]
|
||||
ceph_hooks.check_for_upgrade()
|
||||
|
||||
roll_osd_cluster.assert_called_with('cloud:trusty-kilo')
|
||||
|
||||
@patch('ceph_hooks.upgrade_osd')
|
||||
@patch('ceph_hooks.monitor_key_set')
|
||||
def test_lock_and_roll(self, monitor_key_set, upgrade_osd):
|
||||
monitor_key_set.monitor_key_set.return_value = None
|
||||
ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2')
|
||||
upgrade_osd.assert_called_once_with()
|
||||
|
||||
def test_upgrade_osd(self):
|
||||
self.config.side_effect = config_side_effect
|
||||
self.ceph.get_version.return_value = "0.80"
|
||||
self.ceph.systemd.return_value = False
|
||||
ceph_hooks.upgrade_osd()
|
||||
self.service_stop.assert_called_with('ceph-osd-all')
|
||||
self.service_start.assert_called_with('ceph-osd-all')
|
||||
self.status_set.assert_has_calls([
|
||||
call('maintenance', 'Upgrading osd'),
|
||||
])
|
||||
|
||||
@patch('ceph_hooks.lock_and_roll')
|
||||
@patch('ceph_hooks.get_upgrade_position')
|
||||
def test_roll_osd_cluster_first(self,
|
||||
get_upgrade_position,
|
||||
lock_and_roll):
|
||||
self.socket.gethostname.return_value = "ip-192-168-1-2"
|
||||
self.ceph.get_osd_tree.return_value = ""
|
||||
get_upgrade_position.return_value = 0
|
||||
ceph_hooks.roll_osd_cluster('0.94.1')
|
||||
lock_and_roll.assert_called_with(my_name="ip-192-168-1-2")
|
||||
|
||||
@patch('ceph_hooks.lock_and_roll')
|
||||
@patch('ceph_hooks.get_upgrade_position')
|
||||
@patch('ceph_hooks.wait_on_previous_node')
|
||||
def test_roll_osd_cluster_second(self,
|
||||
wait_on_previous_node,
|
||||
get_upgrade_position,
|
||||
lock_and_roll):
|
||||
wait_on_previous_node.return_value = None
|
||||
self.socket.gethostname.return_value = "ip-192-168-1-3"
|
||||
self.ceph.get_osd_tree.return_value = [
|
||||
CrushLocation(
|
||||
name="ip-192-168-1-2",
|
||||
identifier='a',
|
||||
host='host-a',
|
||||
rack='rack-a',
|
||||
row='row-a',
|
||||
datacenter='dc-1',
|
||||
chassis='chassis-a',
|
||||
root='ceph'),
|
||||
CrushLocation(
|
||||
name="ip-192-168-1-3",
|
||||
identifier='a',
|
||||
host='host-b',
|
||||
rack='rack-a',
|
||||
row='row-a',
|
||||
datacenter='dc-1',
|
||||
chassis='chassis-a',
|
||||
root='ceph')
|
||||
]
|
||||
get_upgrade_position.return_value = 1
|
||||
ceph_hooks.roll_osd_cluster('0.94.1')
|
||||
self.status_set.assert_called_with(
|
||||
'blocked',
|
||||
'Waiting on ip-192-168-1-2 to finish upgrading')
|
||||
lock_and_roll.assert_called_with(my_name="ip-192-168-1-3")
|
||||
|
||||
@patch('ceph_hooks.monitor_key_get')
|
||||
@patch('ceph_hooks.monitor_key_exists')
|
||||
def test_wait_on_previous_node(self,
|
||||
monitor_key_exists,
|
||||
monitor_key_get):
|
||||
monitor_key_get.side_effect = monitor_key_side_effect
|
||||
monitor_key_exists.return_value = False
|
||||
|
||||
ceph_hooks.wait_on_previous_node("ip-192-168-1-2")
|
||||
|
||||
# Make sure we checked to see if the previous node started
|
||||
monitor_key_get.assert_has_calls(
|
||||
[call('osd-upgrade', 'ip-192-168-1-2_start')]
|
||||
)
|
||||
# Make sure we checked to see if the previous node was finished
|
||||
monitor_key_exists.assert_has_calls(
|
||||
[call('osd-upgrade', 'ip-192-168-1-2_done')]
|
||||
)
|
||||
# Make sure we waited at last once before proceeding
|
||||
self.log.assert_has_calls(
|
||||
[call('Previous node is: ip-192-168-1-2')],
|
||||
[call('ip-192-168-1-2 is not finished. Waiting')],
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user