diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml
index c8c54766..cb5cbac0 100644
--- a/charm-helpers-hooks.yaml
+++ b/charm-helpers-hooks.yaml
@@ -5,6 +5,7 @@ include:
- cli
- fetch
- contrib.storage.linux:
+ - ceph
- utils
- contrib.openstack.alternatives
- contrib.network.ip
diff --git a/hooks/ceph.py b/hooks/ceph.py
index 51b06ac8..0b23979b 100644
--- a/hooks/ceph.py
+++ b/hooks/ceph.py
@@ -19,11 +19,10 @@ from charmhelpers.cli.host import mounts
from charmhelpers.core.host import (
mkdir,
chownr,
- service_restart,
cmp_pkgrevno,
lsb_release,
- service_stop
-)
+ service_stop,
+ service_restart)
from charmhelpers.core.hookenv import (
log,
ERROR,
@@ -58,6 +57,112 @@ def ceph_user():
return "root"
+class CrushLocation(object):
+ def __init__(self,
+ name,
+ identifier,
+ host,
+ rack,
+ row,
+ datacenter,
+ chassis,
+ root):
+ self.name = name
+ self.identifier = identifier
+ self.host = host
+ self.rack = rack
+ self.row = row
+ self.datacenter = datacenter
+ self.chassis = chassis
+ self.root = root
+
+ def __str__(self):
+ return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \
+ "chassis :{} root: {}".format(self.name, self.identifier,
+ self.host, self.rack, self.row,
+ self.datacenter, self.chassis,
+ self.root)
+
+ def __eq__(self, other):
+ return not self.name < other.name and not other.name < self.name
+
+ def __ne__(self, other):
+ return self.name < other.name or other.name < self.name
+
+ def __gt__(self, other):
+ return self.name > other.name
+
+ def __ge__(self, other):
+ return not self.name < other.name
+
+ def __le__(self, other):
+ return self.name < other.name
+
+
+def get_osd_tree(service):
+ """
+ Returns the current osd map in JSON.
+ :return: List. :raise: ValueError if the monmap fails to parse.
+ Also raises CalledProcessError if our ceph command fails
+ """
+ try:
+ tree = subprocess.check_output(
+ ['ceph', '--id', service,
+ 'osd', 'tree', '--format=json'])
+ try:
+ json_tree = json.loads(tree)
+ crush_list = []
+ # Make sure children are present in the json
+ if not json_tree['nodes']:
+ return None
+ child_ids = json_tree['nodes'][0]['children']
+ for child in json_tree['nodes']:
+ if child['id'] in child_ids:
+ crush_list.append(
+ CrushLocation(
+ name=child.get('name'),
+ identifier=child['id'],
+ host=child.get('host'),
+ rack=child.get('rack'),
+ row=child.get('row'),
+ datacenter=child.get('datacenter'),
+ chassis=child.get('chassis'),
+ root=child.get('root')
+ )
+ )
+ return crush_list
+ except ValueError as v:
+ log("Unable to parse ceph tree json: {}. Error: {}".format(
+ tree, v.message))
+ raise
+ except subprocess.CalledProcessError as e:
+ log("ceph osd tree command failed with message: {}".format(
+ e.message))
+ raise
+
+
+def get_local_osd_ids():
+ """
+ This will list the /var/lib/ceph/osd/* directories and try
+ to split the ID off of the directory name and return it in
+ a list
+
+ :return: list. A list of osd identifiers :raise: OSError if
+ something goes wrong with listing the directory.
+ """
+ osd_ids = []
+ osd_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'osd')
+ if os.path.exists(osd_path):
+ try:
+ dirs = os.listdir(osd_path)
+ for osd_dir in dirs:
+ osd_id = osd_dir.split('-')[1]
+ osd_ids.append(osd_id)
+ except OSError:
+ raise
+ return osd_ids
+
+
def get_version():
'''Derive Ceph release from an installed package.'''
import apt_pkg as apt
@@ -308,6 +413,7 @@ def rescan_osd_devices():
_bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
+_upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring"
def is_bootstrapped():
@@ -333,6 +439,21 @@ def import_osd_bootstrap_key(key):
]
subprocess.check_call(cmd)
+
+def import_osd_upgrade_key(key):
+ if not os.path.exists(_upgrade_keyring):
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ 'ceph-authtool',
+ _upgrade_keyring,
+ '--create-keyring',
+ '--name=client.osd-upgrade',
+ '--add-key={}'.format(key)
+ ]
+ subprocess.check_call(cmd)
+
# OSD caps taken from ceph-create-keys
_osd_bootstrap_caps = {
'mon': [
@@ -499,7 +620,7 @@ def update_monfs():
def maybe_zap_journal(journal_dev):
- if (is_osd_disk(journal_dev)):
+ if is_osd_disk(journal_dev):
log('Looks like {} is already an OSD data'
' or journal, skipping.'.format(journal_dev))
return
@@ -543,7 +664,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
log('Path {} is not a block device - bailing'.format(dev))
return
- if (is_osd_disk(dev) and not reformat_osd):
+ if is_osd_disk(dev) and not reformat_osd:
log('Looks like {} is already an'
' OSD data or journal, skipping.'.format(dev))
return
@@ -617,7 +738,7 @@ def filesystem_mounted(fs):
def get_running_osds():
- '''Returns a list of the pids of the current running OSD daemons'''
+ """Returns a list of the pids of the current running OSD daemons"""
cmd = ['pgrep', 'ceph-osd']
try:
result = subprocess.check_output(cmd)
diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py
index 912abc11..f31bbf52 100755
--- a/hooks/ceph_hooks.py
+++ b/hooks/ceph_hooks.py
@@ -9,12 +9,16 @@
import glob
import os
+import random
import shutil
+import subprocess
import sys
import tempfile
import socket
+import time
import ceph
+from charmhelpers.core import hookenv
from charmhelpers.core.hookenv import (
log,
ERROR,
@@ -31,8 +35,8 @@ from charmhelpers.core.hookenv import (
from charmhelpers.core.host import (
umount,
mkdir,
- cmp_pkgrevno
-)
+ cmp_pkgrevno,
+ service_stop, service_start)
from charmhelpers.fetch import (
add_source,
apt_install,
@@ -40,24 +44,216 @@ from charmhelpers.fetch import (
filter_installed_packages,
)
from charmhelpers.core.sysctl import create as create_sysctl
+from charmhelpers.core import host
from utils import (
get_host_ip,
get_networks,
assert_charm_supports_ipv6,
- render_template,
-)
+ render_template)
from charmhelpers.contrib.openstack.alternatives import install_alternative
from charmhelpers.contrib.network.ip import (
get_ipv6_addr,
format_ipv6_addr,
)
-
+from charmhelpers.contrib.storage.linux.ceph import (
+ monitor_key_set,
+ monitor_key_exists,
+ monitor_key_get)
from charmhelpers.contrib.charmsupport import nrpe
hooks = Hooks()
+# A dict of valid ceph upgrade paths. Mapping is old -> new
+upgrade_paths = {
+ 'cloud:trusty-juno': 'cloud:trusty-kilo',
+ 'cloud:trusty-kilo': 'cloud:trusty-liberty',
+ 'cloud:trusty-liberty': 'cloud:trusty-mitaka',
+}
+
+
+def pretty_print_upgrade_paths():
+ lines = []
+ for key, value in upgrade_paths.iteritems():
+ lines.append("{} -> {}".format(key, value))
+ return lines
+
+
+def check_for_upgrade():
+ release_info = host.lsb_release()
+ if not release_info['DISTRIB_CODENAME'] == 'trusty':
+ log("Invalid upgrade path from {}. Only trusty is currently "
+ "supported".format(release_info['DISTRIB_CODENAME']))
+ return
+
+ c = hookenv.config()
+ old_version = c.previous('source')
+ log('old_version: {}'.format(old_version))
+ # Strip all whitespace
+ new_version = hookenv.config('source')
+ if new_version:
+ # replace all whitespace
+ new_version = new_version.replace(' ', '')
+ log('new_version: {}'.format(new_version))
+
+ if old_version in upgrade_paths:
+ if new_version == upgrade_paths[old_version]:
+ log("{} to {} is a valid upgrade path. Proceeding.".format(
+ old_version, new_version))
+ roll_osd_cluster(new_version)
+ else:
+ # Log a helpful error message
+ log("Invalid upgrade path from {} to {}. "
+ "Valid paths are: {}".format(old_version,
+ new_version,
+ pretty_print_upgrade_paths()))
+
+
+def lock_and_roll(my_name):
+ start_timestamp = time.time()
+
+ log('monitor_key_set {}_start {}'.format(my_name, start_timestamp))
+ monitor_key_set('osd-upgrade', "{}_start".format(my_name), start_timestamp)
+ log("Rolling")
+ # This should be quick
+ upgrade_osd()
+ log("Done")
+
+ stop_timestamp = time.time()
+ # Set a key to inform others I am finished
+ log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp))
+ monitor_key_set('osd-upgrade', "{}_done".format(my_name), stop_timestamp)
+
+
+def wait_on_previous_node(previous_node):
+ log("Previous node is: {}".format(previous_node))
+
+ previous_node_finished = monitor_key_exists(
+ 'osd-upgrade',
+ "{}_done".format(previous_node))
+
+ while previous_node_finished is False:
+ log("{} is not finished. Waiting".format(previous_node))
+ # Has this node been trying to upgrade for longer than
+ # 10 minutes?
+ # If so then move on and consider that node dead.
+
+ # NOTE: This assumes the clusters clocks are somewhat accurate
+ # If the hosts clock is really far off it may cause it to skip
+ # the previous node even though it shouldn't.
+ current_timestamp = time.time()
+ previous_node_start_time = monitor_key_get(
+ 'osd-upgrade',
+ "{}_start".format(previous_node))
+ if (current_timestamp - (10 * 60)) > previous_node_start_time:
+ # Previous node is probably dead. Lets move on
+ if previous_node_start_time is not None:
+ log(
+ "Waited 10 mins on node {}. current time: {} > "
+ "previous node start time: {} Moving on".format(
+ previous_node,
+ (current_timestamp - (10 * 60)),
+ previous_node_start_time))
+ return
+ else:
+ # I have to wait. Sleep a random amount of time and then
+ # check if I can lock,upgrade and roll.
+ wait_time = random.randrange(5, 30)
+ log('waiting for {} seconds'.format(wait_time))
+ time.sleep(wait_time)
+ previous_node_finished = monitor_key_exists(
+ 'osd-upgrade',
+ "{}_done".format(previous_node))
+
+
+def get_upgrade_position(osd_sorted_list, match_name):
+ for index, item in enumerate(osd_sorted_list):
+ if item.name == match_name:
+ return index
+ return None
+
+
+# Edge cases:
+# 1. Previous node dies on upgrade, can we retry?
+# 2. This assumes that the osd failure domain is not set to osd.
+# It rolls an entire server at a time.
+def roll_osd_cluster(new_version):
+ """
+ This is tricky to get right so here's what we're going to do.
+ There's 2 possible cases: Either I'm first in line or not.
+ If I'm not first in line I'll wait a random time between 5-30 seconds
+ and test to see if the previous osd is upgraded yet.
+
+ TODO: If you're not in the same failure domain it's safe to upgrade
+ 1. Examine all pools and adopt the most strict failure domain policy
+ Example: Pool 1: Failure domain = rack
+ Pool 2: Failure domain = host
+ Pool 3: Failure domain = row
+
+ outcome: Failure domain = host
+ """
+ log('roll_osd_cluster called with {}'.format(new_version))
+ my_name = socket.gethostname()
+ osd_tree = ceph.get_osd_tree(service='osd-upgrade')
+ # A sorted list of osd unit names
+ osd_sorted_list = sorted(osd_tree)
+ log("osd_sorted_list: {}".format(osd_sorted_list))
+
+ try:
+ position = get_upgrade_position(osd_sorted_list, my_name)
+ log("upgrade position: {}".format(position))
+ if position == 0:
+ # I'm first! Roll
+ # First set a key to inform others I'm about to roll
+ lock_and_roll(my_name=my_name)
+ else:
+ # Check if the previous node has finished
+ status_set('blocked',
+ 'Waiting on {} to finish upgrading'.format(
+ osd_sorted_list[position - 1].name))
+ wait_on_previous_node(
+ previous_node=osd_sorted_list[position - 1].name)
+ lock_and_roll(my_name=my_name)
+ except ValueError:
+ log("Failed to find name {} in list {}".format(
+ my_name, osd_sorted_list))
+ status_set('blocked', 'failed to upgrade osd')
+
+
+def upgrade_osd():
+ current_version = ceph.get_version()
+ status_set("maintenance", "Upgrading osd")
+ log("Current ceph version is {}".format(current_version))
+ new_version = config('release-version')
+ log("Upgrading to: {}".format(new_version))
+
+ try:
+ add_source(config('source'), config('key'))
+ apt_update(fatal=True)
+ except subprocess.CalledProcessError as err:
+ log("Adding the ceph source failed with message: {}".format(
+ err.message))
+ status_set("blocked", "Upgrade to {} failed".format(new_version))
+ sys.exit(1)
+ try:
+ if ceph.systemd():
+ for osd_id in ceph.get_local_osd_ids():
+ service_stop('ceph-osd@{}'.format(osd_id))
+ else:
+ service_stop('ceph-osd-all')
+ apt_install(packages=ceph.PACKAGES, fatal=True)
+ if ceph.systemd():
+ for osd_id in ceph.get_local_osd_ids():
+ service_start('ceph-osd@{}'.format(osd_id))
+ else:
+ service_start('ceph-osd-all')
+ except subprocess.CalledProcessError as err:
+ log("Stopping ceph and upgrading packages failed "
+ "with message: {}".format(err.message))
+ status_set("blocked", "Upgrade to {} failed".format(new_version))
+ sys.exit(1)
+
def install_upstart_scripts():
# Only install upstart configurations for older versions
@@ -124,6 +320,7 @@ def emit_cephconf():
install_alternative('ceph.conf', '/etc/ceph/ceph.conf',
charm_ceph_conf, 90)
+
JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped'
@@ -158,6 +355,9 @@ def check_overlap(journaldevs, datadevs):
@hooks.hook('config-changed')
def config_changed():
+ # Check if an upgrade was requested
+ check_for_upgrade()
+
# Pre-flight checks
if config('osd-format') not in ceph.DISK_FORMATS:
log('Invalid OSD disk format configuration specified', level=ERROR)
@@ -171,7 +371,7 @@ def config_changed():
create_sysctl(sysctl_dict, '/etc/sysctl.d/50-ceph-osd-charm.conf')
e_mountpoint = config('ephemeral-unmount')
- if (e_mountpoint and ceph.filesystem_mounted(e_mountpoint)):
+ if e_mountpoint and ceph.filesystem_mounted(e_mountpoint):
umount(e_mountpoint)
prepare_disks_and_activate()
@@ -201,8 +401,14 @@ def get_mon_hosts():
hosts = []
for relid in relation_ids('mon'):
for unit in related_units(relid):
- addr = relation_get('ceph-public-address', unit, relid) or \
- get_host_ip(relation_get('private-address', unit, relid))
+ addr = \
+ relation_get('ceph-public-address',
+ unit,
+ relid) or get_host_ip(
+ relation_get(
+ 'private-address',
+ unit,
+ relid))
if addr:
hosts.append('{}:6789'.format(format_ipv6_addr(addr) or addr))
@@ -258,10 +464,12 @@ def get_journal_devices():
'mon-relation-departed')
def mon_relation():
bootstrap_key = relation_get('osd_bootstrap_key')
+ upgrade_key = relation_get('osd_upgrade_key')
if get_fsid() and get_auth() and bootstrap_key:
log('mon has provided conf- scanning disks')
emit_cephconf()
ceph.import_osd_bootstrap_key(bootstrap_key)
+ ceph.import_osd_upgrade_key(upgrade_key)
prepare_disks_and_activate()
else:
log('mon cluster has not yet provided conf')
diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py
new file mode 100644
index 00000000..f4582545
--- /dev/null
+++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py
@@ -0,0 +1,1195 @@
+# Copyright 2014-2015 Canonical Limited.
+#
+# This file is part of charm-helpers.
+#
+# charm-helpers is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# charm-helpers is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with charm-helpers. If not, see .
+
+#
+# Copyright 2012 Canonical Ltd.
+#
+# This file is sourced from lp:openstack-charm-helpers
+#
+# Authors:
+# James Page
+# Adam Gandelman
+#
+import bisect
+import errno
+import hashlib
+import six
+
+import os
+import shutil
+import json
+import time
+import uuid
+
+from subprocess import (
+ check_call,
+ check_output,
+ CalledProcessError,
+)
+from charmhelpers.core.hookenv import (
+ local_unit,
+ relation_get,
+ relation_ids,
+ relation_set,
+ related_units,
+ log,
+ DEBUG,
+ INFO,
+ WARNING,
+ ERROR,
+)
+from charmhelpers.core.host import (
+ mount,
+ mounts,
+ service_start,
+ service_stop,
+ service_running,
+ umount,
+)
+from charmhelpers.fetch import (
+ apt_install,
+)
+
+from charmhelpers.core.kernel import modprobe
+
+KEYRING = '/etc/ceph/ceph.client.{}.keyring'
+KEYFILE = '/etc/ceph/ceph.client.{}.key'
+
+CEPH_CONF = """[global]
+auth supported = {auth}
+keyring = {keyring}
+mon host = {mon_hosts}
+log to syslog = {use_syslog}
+err to syslog = {use_syslog}
+clog to syslog = {use_syslog}
+"""
+# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
+powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
+
+
+def validator(value, valid_type, valid_range=None):
+ """
+ Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
+ Example input:
+ validator(value=1,
+ valid_type=int,
+ valid_range=[0, 2])
+ This says I'm testing value=1. It must be an int inclusive in [0,2]
+
+ :param value: The value to validate
+ :param valid_type: The type that value should be.
+ :param valid_range: A range of values that value can assume.
+ :return:
+ """
+ assert isinstance(value, valid_type), "{} is not a {}".format(
+ value,
+ valid_type)
+ if valid_range is not None:
+ assert isinstance(valid_range, list), \
+ "valid_range must be a list, was given {}".format(valid_range)
+ # If we're dealing with strings
+ if valid_type is six.string_types:
+ assert value in valid_range, \
+ "{} is not in the list {}".format(value, valid_range)
+ # Integer, float should have a min and max
+ else:
+ if len(valid_range) != 2:
+ raise ValueError(
+ "Invalid valid_range list of {} for {}. "
+ "List must be [min,max]".format(valid_range, value))
+ assert value >= valid_range[0], \
+ "{} is less than minimum allowed value of {}".format(
+ value, valid_range[0])
+ assert value <= valid_range[1], \
+ "{} is greater than maximum allowed value of {}".format(
+ value, valid_range[1])
+
+
+class PoolCreationError(Exception):
+ """
+ A custom error to inform the caller that a pool creation failed. Provides an error message
+ """
+
+ def __init__(self, message):
+ super(PoolCreationError, self).__init__(message)
+
+
+class Pool(object):
+ """
+ An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
+ Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
+ """
+
+ def __init__(self, service, name):
+ self.service = service
+ self.name = name
+
+ # Create the pool if it doesn't exist already
+ # To be implemented by subclasses
+ def create(self):
+ pass
+
+ def add_cache_tier(self, cache_pool, mode):
+ """
+ Adds a new cache tier to an existing pool.
+ :param cache_pool: six.string_types. The cache tier pool name to add.
+ :param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
+ :return: None
+ """
+ # Check the input types and values
+ validator(value=cache_pool, valid_type=six.string_types)
+ validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
+
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
+ check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
+
+ def remove_cache_tier(self, cache_pool):
+ """
+ Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
+ :param cache_pool: six.string_types. The cache tier pool name to remove.
+ :return: None
+ """
+ # read-only is easy, writeback is much harder
+ mode = get_cache_mode(self.service, cache_pool)
+ if mode == 'readonly':
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
+
+ elif mode == 'writeback':
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
+ # Flush the cache and wait for it to return
+ check_call(['ceph', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
+
+ def get_pgs(self, pool_size):
+ """
+ :param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
+ erasure coded pools
+ :return: int. The number of pgs to use.
+ """
+ validator(value=pool_size, valid_type=int)
+ osd_list = get_osds(self.service)
+ if not osd_list:
+ # NOTE(james-page): Default to 200 for older ceph versions
+ # which don't support OSD query from cli
+ return 200
+
+ osd_list_length = len(osd_list)
+ # Calculate based on Ceph best practices
+ if osd_list_length < 5:
+ return 128
+ elif 5 < osd_list_length < 10:
+ return 512
+ elif 10 < osd_list_length < 50:
+ return 4096
+ else:
+ estimate = (osd_list_length * 100) / pool_size
+ # Return the next nearest power of 2
+ index = bisect.bisect_right(powers_of_two, estimate)
+ return powers_of_two[index]
+
+
+class ReplicatedPool(Pool):
+ def __init__(self, service, name, pg_num=None, replicas=2):
+ super(ReplicatedPool, self).__init__(service=service, name=name)
+ self.replicas = replicas
+ if pg_num is None:
+ self.pg_num = self.get_pgs(self.replicas)
+ else:
+ self.pg_num = pg_num
+
+ def create(self):
+ if not pool_exists(self.service, self.name):
+ # Create it
+ cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
+ self.name, str(self.pg_num)]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+# Default jerasure erasure coded pool
+class ErasurePool(Pool):
+ def __init__(self, service, name, erasure_code_profile="default"):
+ super(ErasurePool, self).__init__(service=service, name=name)
+ self.erasure_code_profile = erasure_code_profile
+
+ def create(self):
+ if not pool_exists(self.service, self.name):
+ # Try to find the erasure profile information so we can properly size the pgs
+ erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
+
+ # Check for errors
+ if erasure_profile is None:
+ log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
+ level=ERROR)
+ raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
+ if 'k' not in erasure_profile or 'm' not in erasure_profile:
+ # Error
+ log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
+ level=ERROR)
+ raise PoolCreationError(
+ message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
+
+ pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
+ # Create it
+ cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
+ 'erasure', self.erasure_code_profile]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+ """Get an existing erasure code profile if it already exists.
+ Returns json formatted output"""
+
+
+def get_mon_map(service):
+ """
+ Returns the current monitor map.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :return: json string. :raise: ValueError if the monmap fails to parse.
+ Also raises CalledProcessError if our ceph command fails
+ """
+ try:
+ mon_status = check_output(
+ ['ceph', '--id', service,
+ 'mon_status', '--format=json'])
+ try:
+ return json.loads(mon_status)
+ except ValueError as v:
+ log("Unable to parse mon_status json: {}. Error: {}".format(
+ mon_status, v.message))
+ raise
+ except CalledProcessError as e:
+ log("mon_status command failed with message: {}".format(
+ e.message))
+ raise
+
+
+def hash_monitor_names(service):
+ """
+ Uses the get_mon_map() function to get information about the monitor
+ cluster.
+ Hash the name of each monitor. Return a sorted list of monitor hashes
+ in an ascending order.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :rtype : dict. json dict of monitor name, ip address and rank
+ example: {
+ 'name': 'ip-172-31-13-165',
+ 'rank': 0,
+ 'addr': '172.31.13.165:6789/0'}
+ """
+ try:
+ hash_list = []
+ monitor_list = get_mon_map(service=service)
+ if monitor_list['monmap']['mons']:
+ for mon in monitor_list['monmap']['mons']:
+ hash_list.append(
+ hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
+ return sorted(hash_list)
+ else:
+ return None
+ except (ValueError, CalledProcessError):
+ raise
+
+
+def monitor_key_delete(service, key):
+ """
+ Delete a key and value pair from the monitor cluster
+ :param service: six.string_types. The Ceph user name to run the command under
+ Deletes a key value pair on the monitor cluster.
+ :param key: six.string_types. The key to delete.
+ """
+ try:
+ check_output(
+ ['ceph', '--id', service,
+ 'config-key', 'del', str(key)])
+ except CalledProcessError as e:
+ log("Monitor config-key put failed with message: {}".format(
+ e.output))
+ raise
+
+
+def monitor_key_set(service, key, value):
+ """
+ Sets a key value pair on the monitor cluster.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param key: six.string_types. The key to set.
+ :param value: The value to set. This will be converted to a string
+ before setting
+ """
+ try:
+ check_output(
+ ['ceph', '--id', service,
+ 'config-key', 'put', str(key), str(value)])
+ except CalledProcessError as e:
+ log("Monitor config-key put failed with message: {}".format(
+ e.output))
+ raise
+
+
+def monitor_key_get(service, key):
+ """
+ Gets the value of an existing key in the monitor cluster.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param key: six.string_types. The key to search for.
+ :return: Returns the value of that key or None if not found.
+ """
+ try:
+ output = check_output(
+ ['ceph', '--id', service,
+ 'config-key', 'get', str(key)])
+ return output
+ except CalledProcessError as e:
+ log("Monitor config-key get failed with message: {}".format(
+ e.output))
+ return None
+
+
+def monitor_key_exists(service, key):
+ """
+ Searches for the existence of a key in the monitor cluster.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param key: six.string_types. The key to search for
+ :return: Returns True if the key exists, False if not and raises an
+ exception if an unknown error occurs. :raise: CalledProcessError if
+ an unknown error occurs
+ """
+ try:
+ check_call(
+ ['ceph', '--id', service,
+ 'config-key', 'exists', str(key)])
+ # I can return true here regardless because Ceph returns
+ # ENOENT if the key wasn't found
+ return True
+ except CalledProcessError as e:
+ if e.returncode == errno.ENOENT:
+ return False
+ else:
+ log("Unknown error from ceph config-get exists: {} {}".format(
+ e.returncode, e.output))
+ raise
+
+
+def get_erasure_profile(service, name):
+ """
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param name:
+ :return:
+ """
+ try:
+ out = check_output(['ceph', '--id', service,
+ 'osd', 'erasure-code-profile', 'get',
+ name, '--format=json'])
+ return json.loads(out)
+ except (CalledProcessError, OSError, ValueError):
+ return None
+
+
+def pool_set(service, pool_name, key, value):
+ """
+ Sets a value for a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param key: six.string_types
+ :param value:
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def snapshot_pool(service, pool_name, snapshot_name):
+ """
+ Snapshots a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param snapshot_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def remove_pool_snapshot(service, pool_name, snapshot_name):
+ """
+ Remove a snapshot from a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param snapshot_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+# max_bytes should be an int or long
+def set_pool_quota(service, pool_name, max_bytes):
+ """
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param max_bytes: int or long
+ :return: None. Can raise CalledProcessError
+ """
+ # Set a byte quota on a RADOS pool in ceph.
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
+ 'max_bytes', str(max_bytes)]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def remove_pool_quota(service, pool_name):
+ """
+ Set a byte quota on a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def remove_erasure_profile(service, profile_name):
+ """
+ Create a new erasure code profile if one does not already exist for it. Updates
+ the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
+ for more details
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param profile_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
+ profile_name]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
+ failure_domain='host',
+ data_chunks=2, coding_chunks=1,
+ locality=None, durability_estimator=None):
+ """
+ Create a new erasure code profile if one does not already exist for it. Updates
+ the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
+ for more details
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param profile_name: six.string_types
+ :param erasure_plugin_name: six.string_types
+ :param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
+ 'room', 'root', 'row'])
+ :param data_chunks: int
+ :param coding_chunks: int
+ :param locality: int
+ :param durability_estimator: int
+ :return: None. Can raise CalledProcessError
+ """
+ # Ensure this failure_domain is allowed by Ceph
+ validator(failure_domain, six.string_types,
+ ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
+
+ cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
+ 'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
+ 'ruleset_failure_domain=' + failure_domain]
+ if locality is not None and durability_estimator is not None:
+ raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
+
+ # Add plugin specific information
+ if locality is not None:
+ # For local erasure codes
+ cmd.append('l=' + str(locality))
+ if durability_estimator is not None:
+ # For Shec erasure codes
+ cmd.append('c=' + str(durability_estimator))
+
+ if erasure_profile_exists(service, profile_name):
+ cmd.append('--force')
+
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def rename_pool(service, old_name, new_name):
+ """
+ Rename a Ceph pool from old_name to new_name
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param old_name: six.string_types
+ :param new_name: six.string_types
+ :return: None
+ """
+ validator(value=old_name, valid_type=six.string_types)
+ validator(value=new_name, valid_type=six.string_types)
+
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
+ check_call(cmd)
+
+
+def erasure_profile_exists(service, name):
+ """
+ Check to see if an Erasure code profile already exists.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param name: six.string_types
+ :return: int or None
+ """
+ validator(value=name, valid_type=six.string_types)
+ try:
+ check_call(['ceph', '--id', service,
+ 'osd', 'erasure-code-profile', 'get',
+ name])
+ return True
+ except CalledProcessError:
+ return False
+
+
+def get_cache_mode(service, pool_name):
+ """
+ Find the current caching mode of the pool_name given.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :return: int or None
+ """
+ validator(value=service, valid_type=six.string_types)
+ validator(value=pool_name, valid_type=six.string_types)
+ out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
+ try:
+ osd_json = json.loads(out)
+ for pool in osd_json['pools']:
+ if pool['pool_name'] == pool_name:
+ return pool['cache_mode']
+ return None
+ except ValueError:
+ raise
+
+
+def pool_exists(service, name):
+ """Check to see if a RADOS pool already exists."""
+ try:
+ out = check_output(['rados', '--id', service,
+ 'lspools']).decode('UTF-8')
+ except CalledProcessError:
+ return False
+
+ return name in out
+
+
+def get_osds(service):
+ """Return a list of all Ceph Object Storage Daemons currently in the
+ cluster.
+ """
+ version = ceph_version()
+ if version and version >= '0.56':
+ return json.loads(check_output(['ceph', '--id', service,
+ 'osd', 'ls',
+ '--format=json']).decode('UTF-8'))
+
+ return None
+
+
+def install():
+ """Basic Ceph client installation."""
+ ceph_dir = "/etc/ceph"
+ if not os.path.exists(ceph_dir):
+ os.mkdir(ceph_dir)
+
+ apt_install('ceph-common', fatal=True)
+
+
+def rbd_exists(service, pool, rbd_img):
+ """Check to see if a RADOS block device exists."""
+ try:
+ out = check_output(['rbd', 'list', '--id',
+ service, '--pool', pool]).decode('UTF-8')
+ except CalledProcessError:
+ return False
+
+ return rbd_img in out
+
+
+def create_rbd_image(service, pool, image, sizemb):
+ """Create a new RADOS block device."""
+ cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
+ '--pool', pool]
+ check_call(cmd)
+
+
+def update_pool(client, pool, settings):
+ cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
+ for k, v in six.iteritems(settings):
+ cmd.append(k)
+ cmd.append(v)
+
+ check_call(cmd)
+
+
+def create_pool(service, name, replicas=3, pg_num=None):
+ """Create a new RADOS pool."""
+ if pool_exists(service, name):
+ log("Ceph pool {} already exists, skipping creation".format(name),
+ level=WARNING)
+ return
+
+ if not pg_num:
+ # Calculate the number of placement groups based
+ # on upstream recommended best practices.
+ osds = get_osds(service)
+ if osds:
+ pg_num = (len(osds) * 100 // replicas)
+ else:
+ # NOTE(james-page): Default to 200 for older ceph versions
+ # which don't support OSD query from cli
+ pg_num = 200
+
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
+ check_call(cmd)
+
+ update_pool(service, name, settings={'size': str(replicas)})
+
+
+def delete_pool(service, name):
+ """Delete a RADOS pool from ceph."""
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
+ '--yes-i-really-really-mean-it']
+ check_call(cmd)
+
+
+def _keyfile_path(service):
+ return KEYFILE.format(service)
+
+
+def _keyring_path(service):
+ return KEYRING.format(service)
+
+
+def create_keyring(service, key):
+ """Create a new Ceph keyring containing key."""
+ keyring = _keyring_path(service)
+ if os.path.exists(keyring):
+ log('Ceph keyring exists at %s.' % keyring, level=WARNING)
+ return
+
+ cmd = ['ceph-authtool', keyring, '--create-keyring',
+ '--name=client.{}'.format(service), '--add-key={}'.format(key)]
+ check_call(cmd)
+ log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
+
+
+def delete_keyring(service):
+ """Delete an existing Ceph keyring."""
+ keyring = _keyring_path(service)
+ if not os.path.exists(keyring):
+ log('Keyring does not exist at %s' % keyring, level=WARNING)
+ return
+
+ os.remove(keyring)
+ log('Deleted ring at %s.' % keyring, level=INFO)
+
+
+def create_key_file(service, key):
+ """Create a file containing key."""
+ keyfile = _keyfile_path(service)
+ if os.path.exists(keyfile):
+ log('Keyfile exists at %s.' % keyfile, level=WARNING)
+ return
+
+ with open(keyfile, 'w') as fd:
+ fd.write(key)
+
+ log('Created new keyfile at %s.' % keyfile, level=INFO)
+
+
+def get_ceph_nodes(relation='ceph'):
+ """Query named relation to determine current nodes."""
+ hosts = []
+ for r_id in relation_ids(relation):
+ for unit in related_units(r_id):
+ hosts.append(relation_get('private-address', unit=unit, rid=r_id))
+
+ return hosts
+
+
+def configure(service, key, auth, use_syslog):
+ """Perform basic configuration of Ceph."""
+ create_keyring(service, key)
+ create_key_file(service, key)
+ hosts = get_ceph_nodes()
+ with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
+ ceph_conf.write(CEPH_CONF.format(auth=auth,
+ keyring=_keyring_path(service),
+ mon_hosts=",".join(map(str, hosts)),
+ use_syslog=use_syslog))
+ modprobe('rbd')
+
+
+def image_mapped(name):
+ """Determine whether a RADOS block device is mapped locally."""
+ try:
+ out = check_output(['rbd', 'showmapped']).decode('UTF-8')
+ except CalledProcessError:
+ return False
+
+ return name in out
+
+
+def map_block_storage(service, pool, image):
+ """Map a RADOS block device for local use."""
+ cmd = [
+ 'rbd',
+ 'map',
+ '{}/{}'.format(pool, image),
+ '--user',
+ service,
+ '--secret',
+ _keyfile_path(service),
+ ]
+ check_call(cmd)
+
+
+def filesystem_mounted(fs):
+ """Determine whether a filesytems is already mounted."""
+ return fs in [f for f, m in mounts()]
+
+
+def make_filesystem(blk_device, fstype='ext4', timeout=10):
+ """Make a new filesystem on the specified block device."""
+ count = 0
+ e_noent = os.errno.ENOENT
+ while not os.path.exists(blk_device):
+ if count >= timeout:
+ log('Gave up waiting on block device %s' % blk_device,
+ level=ERROR)
+ raise IOError(e_noent, os.strerror(e_noent), blk_device)
+
+ log('Waiting for block device %s to appear' % blk_device,
+ level=DEBUG)
+ count += 1
+ time.sleep(1)
+ else:
+ log('Formatting block device %s as filesystem %s.' %
+ (blk_device, fstype), level=INFO)
+ check_call(['mkfs', '-t', fstype, blk_device])
+
+
+def place_data_on_block_device(blk_device, data_src_dst):
+ """Migrate data in data_src_dst to blk_device and then remount."""
+ # mount block device into /mnt
+ mount(blk_device, '/mnt')
+ # copy data to /mnt
+ copy_files(data_src_dst, '/mnt')
+ # umount block device
+ umount('/mnt')
+ # Grab user/group ID's from original source
+ _dir = os.stat(data_src_dst)
+ uid = _dir.st_uid
+ gid = _dir.st_gid
+ # re-mount where the data should originally be
+ # TODO: persist is currently a NO-OP in core.host
+ mount(blk_device, data_src_dst, persist=True)
+ # ensure original ownership of new mount.
+ os.chown(data_src_dst, uid, gid)
+
+
+def copy_files(src, dst, symlinks=False, ignore=None):
+ """Copy files from src to dst."""
+ for item in os.listdir(src):
+ s = os.path.join(src, item)
+ d = os.path.join(dst, item)
+ if os.path.isdir(s):
+ shutil.copytree(s, d, symlinks, ignore)
+ else:
+ shutil.copy2(s, d)
+
+
+def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
+ blk_device, fstype, system_services=[],
+ replicas=3):
+ """NOTE: This function must only be called from a single service unit for
+ the same rbd_img otherwise data loss will occur.
+
+ Ensures given pool and RBD image exists, is mapped to a block device,
+ and the device is formatted and mounted at the given mount_point.
+
+ If formatting a device for the first time, data existing at mount_point
+ will be migrated to the RBD device before being re-mounted.
+
+ All services listed in system_services will be stopped prior to data
+ migration and restarted when complete.
+ """
+ # Ensure pool, RBD image, RBD mappings are in place.
+ if not pool_exists(service, pool):
+ log('Creating new pool {}.'.format(pool), level=INFO)
+ create_pool(service, pool, replicas=replicas)
+
+ if not rbd_exists(service, pool, rbd_img):
+ log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
+ create_rbd_image(service, pool, rbd_img, sizemb)
+
+ if not image_mapped(rbd_img):
+ log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
+ level=INFO)
+ map_block_storage(service, pool, rbd_img)
+
+ # make file system
+ # TODO: What happens if for whatever reason this is run again and
+ # the data is already in the rbd device and/or is mounted??
+ # When it is mounted already, it will fail to make the fs
+ # XXX: This is really sketchy! Need to at least add an fstab entry
+ # otherwise this hook will blow away existing data if its executed
+ # after a reboot.
+ if not filesystem_mounted(mount_point):
+ make_filesystem(blk_device, fstype)
+
+ for svc in system_services:
+ if service_running(svc):
+ log('Stopping services {} prior to migrating data.'
+ .format(svc), level=DEBUG)
+ service_stop(svc)
+
+ place_data_on_block_device(blk_device, mount_point)
+
+ for svc in system_services:
+ log('Starting service {} after migrating data.'
+ .format(svc), level=DEBUG)
+ service_start(svc)
+
+
+def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
+ """Ensures a ceph keyring is created for a named service and optionally
+ ensures user and group ownership.
+
+ Returns False if no ceph key is available in relation state.
+ """
+ key = None
+ for rid in relation_ids(relation):
+ for unit in related_units(rid):
+ key = relation_get('key', rid=rid, unit=unit)
+ if key:
+ break
+
+ if not key:
+ return False
+
+ create_keyring(service=service, key=key)
+ keyring = _keyring_path(service)
+ if user and group:
+ check_call(['chown', '%s.%s' % (user, group), keyring])
+
+ return True
+
+
+def ceph_version():
+ """Retrieve the local version of ceph."""
+ if os.path.exists('/usr/bin/ceph'):
+ cmd = ['ceph', '-v']
+ output = check_output(cmd).decode('US-ASCII')
+ output = output.split()
+ if len(output) > 3:
+ return output[2]
+ else:
+ return None
+ else:
+ return None
+
+
+class CephBrokerRq(object):
+ """Ceph broker request.
+
+ Multiple operations can be added to a request and sent to the Ceph broker
+ to be executed.
+
+ Request is json-encoded for sending over the wire.
+
+ The API is versioned and defaults to version 1.
+ """
+
+ def __init__(self, api_version=1, request_id=None):
+ self.api_version = api_version
+ if request_id:
+ self.request_id = request_id
+ else:
+ self.request_id = str(uuid.uuid1())
+ self.ops = []
+
+ def add_op_create_pool(self, name, replica_count=3, pg_num=None):
+ """Adds an operation to create a pool.
+
+ @param pg_num setting: optional setting. If not provided, this value
+ will be calculated by the broker based on how many OSDs are in the
+ cluster at the time of creation. Note that, if provided, this value
+ will be capped at the current available maximum.
+ """
+ self.ops.append({'op': 'create-pool', 'name': name,
+ 'replicas': replica_count, 'pg_num': pg_num})
+
+ def set_ops(self, ops):
+ """Set request ops to provided value.
+
+ Useful for injecting ops that come from a previous request
+ to allow comparisons to ensure validity.
+ """
+ self.ops = ops
+
+ @property
+ def request(self):
+ return json.dumps({'api-version': self.api_version, 'ops': self.ops,
+ 'request-id': self.request_id})
+
+ def _ops_equal(self, other):
+ if len(self.ops) == len(other.ops):
+ for req_no in range(0, len(self.ops)):
+ for key in ['replicas', 'name', 'op', 'pg_num']:
+ if self.ops[req_no].get(key) != other.ops[req_no].get(key):
+ return False
+ else:
+ return False
+ return True
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return False
+ if self.api_version == other.api_version and \
+ self._ops_equal(other):
+ return True
+ else:
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+class CephBrokerRsp(object):
+ """Ceph broker response.
+
+ Response is json-decoded and contents provided as methods/properties.
+
+ The API is versioned and defaults to version 1.
+ """
+
+ def __init__(self, encoded_rsp):
+ self.api_version = None
+ self.rsp = json.loads(encoded_rsp)
+
+ @property
+ def request_id(self):
+ return self.rsp.get('request-id')
+
+ @property
+ def exit_code(self):
+ return self.rsp.get('exit-code')
+
+ @property
+ def exit_msg(self):
+ return self.rsp.get('stderr')
+
+
+# Ceph Broker Conversation:
+# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
+# and send that request to ceph via the ceph relation. The CephBrokerRq has a
+# unique id so that the client can identity which CephBrokerRsp is associated
+# with the request. Ceph will also respond to each client unit individually
+# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
+# via key broker-rsp-glance-0
+#
+# To use this the charm can just do something like:
+#
+# from charmhelpers.contrib.storage.linux.ceph import (
+# send_request_if_needed,
+# is_request_complete,
+# CephBrokerRq,
+# )
+#
+# @hooks.hook('ceph-relation-changed')
+# def ceph_changed():
+# rq = CephBrokerRq()
+# rq.add_op_create_pool(name='poolname', replica_count=3)
+#
+# if is_request_complete(rq):
+#
+# else:
+# send_request_if_needed(get_ceph_request())
+#
+# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
+# of glance having sent a request to ceph which ceph has successfully processed
+# 'ceph:8': {
+# 'ceph/0': {
+# 'auth': 'cephx',
+# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
+# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
+# 'ceph-public-address': '10.5.44.103',
+# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
+# 'private-address': '10.5.44.103',
+# },
+# 'glance/0': {
+# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
+# '"ops": [{"replicas": 3, "name": "glance", '
+# '"op": "create-pool"}]}'),
+# 'private-address': '10.5.44.109',
+# },
+# }
+
+def get_previous_request(rid):
+ """Return the last ceph broker request sent on a given relation
+
+ @param rid: Relation id to query for request
+ """
+ request = None
+ broker_req = relation_get(attribute='broker_req', rid=rid,
+ unit=local_unit())
+ if broker_req:
+ request_data = json.loads(broker_req)
+ request = CephBrokerRq(api_version=request_data['api-version'],
+ request_id=request_data['request-id'])
+ request.set_ops(request_data['ops'])
+
+ return request
+
+
+def get_request_states(request, relation='ceph'):
+ """Return a dict of requests per relation id with their corresponding
+ completion state.
+
+ This allows a charm, which has a request for ceph, to see whether there is
+ an equivalent request already being processed and if so what state that
+ request is in.
+
+ @param request: A CephBrokerRq object
+ """
+ complete = []
+ requests = {}
+ for rid in relation_ids(relation):
+ complete = False
+ previous_request = get_previous_request(rid)
+ if request == previous_request:
+ sent = True
+ complete = is_request_complete_for_rid(previous_request, rid)
+ else:
+ sent = False
+ complete = False
+
+ requests[rid] = {
+ 'sent': sent,
+ 'complete': complete,
+ }
+
+ return requests
+
+
+def is_request_sent(request, relation='ceph'):
+ """Check to see if a functionally equivalent request has already been sent
+
+ Returns True if a similair request has been sent
+
+ @param request: A CephBrokerRq object
+ """
+ states = get_request_states(request, relation=relation)
+ for rid in states.keys():
+ if not states[rid]['sent']:
+ return False
+
+ return True
+
+
+def is_request_complete(request, relation='ceph'):
+ """Check to see if a functionally equivalent request has already been
+ completed
+
+ Returns True if a similair request has been completed
+
+ @param request: A CephBrokerRq object
+ """
+ states = get_request_states(request, relation=relation)
+ for rid in states.keys():
+ if not states[rid]['complete']:
+ return False
+
+ return True
+
+
+def is_request_complete_for_rid(request, rid):
+ """Check if a given request has been completed on the given relation
+
+ @param request: A CephBrokerRq object
+ @param rid: Relation ID
+ """
+ broker_key = get_broker_rsp_key()
+ for unit in related_units(rid):
+ rdata = relation_get(rid=rid, unit=unit)
+ if rdata.get(broker_key):
+ rsp = CephBrokerRsp(rdata.get(broker_key))
+ if rsp.request_id == request.request_id:
+ if not rsp.exit_code:
+ return True
+ else:
+ # The remote unit sent no reply targeted at this unit so either the
+ # remote ceph cluster does not support unit targeted replies or it
+ # has not processed our request yet.
+ if rdata.get('broker_rsp'):
+ request_data = json.loads(rdata['broker_rsp'])
+ if request_data.get('request-id'):
+ log('Ignoring legacy broker_rsp without unit key as remote '
+ 'service supports unit specific replies', level=DEBUG)
+ else:
+ log('Using legacy broker_rsp as remote service does not '
+ 'supports unit specific replies', level=DEBUG)
+ rsp = CephBrokerRsp(rdata['broker_rsp'])
+ if not rsp.exit_code:
+ return True
+
+ return False
+
+
+def get_broker_rsp_key():
+ """Return broker response key for this unit
+
+ This is the key that ceph is going to use to pass request status
+ information back to this unit
+ """
+ return 'broker-rsp-' + local_unit().replace('/', '-')
+
+
+def send_request_if_needed(request, relation='ceph'):
+ """Send broker request if an equivalent request has not already been sent
+
+ @param request: A CephBrokerRq object
+ """
+ if is_request_sent(request, relation=relation):
+ log('Request already sent but not complete, not sending new request',
+ level=DEBUG)
+ else:
+ for rid in relation_ids(relation):
+ log('Sending request {}'.format(request.request_id), level=DEBUG)
+ relation_set(relation_id=rid, broker_req=request.request)
diff --git a/templates/ceph.conf b/templates/ceph.conf
index 66da0aca..7fec00e5 100644
--- a/templates/ceph.conf
+++ b/templates/ceph.conf
@@ -33,6 +33,8 @@ cluster addr = {{ cluster_addr }}
osd crush location = {{crush_location}}
{% endif %}
+[client.osd-upgrade]
+keyring = /var/lib/ceph/osd/ceph.client.osd-upgrade.keyring
[mon]
keyring = /var/lib/ceph/mon/$cluster-$id/keyring
diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py
index 49a10b11..7800a00d 100644
--- a/tests/basic_deployment.py
+++ b/tests/basic_deployment.py
@@ -43,8 +43,8 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
and the rest of the service are from lp branches that are
compatible with the local charm (e.g. stable or next).
"""
- this_service = {'name': 'ceph-osd'}
- other_services = [{'name': 'ceph', 'units': 3},
+ this_service = {'name': 'ceph-osd', 'units': 3}
+ other_services = [{'name': 'ceph-mon', 'units': 3},
{'name': 'mysql'},
{'name': 'keystone'},
{'name': 'rabbitmq-server'},
@@ -60,18 +60,18 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
'nova-compute:shared-db': 'mysql:shared-db',
'nova-compute:amqp': 'rabbitmq-server:amqp',
'nova-compute:image-service': 'glance:image-service',
- 'nova-compute:ceph': 'ceph:client',
+ 'nova-compute:ceph': 'ceph-mon:client',
'keystone:shared-db': 'mysql:shared-db',
'glance:shared-db': 'mysql:shared-db',
'glance:identity-service': 'keystone:identity-service',
'glance:amqp': 'rabbitmq-server:amqp',
- 'glance:ceph': 'ceph:client',
+ 'glance:ceph': 'ceph-mon:client',
'cinder:shared-db': 'mysql:shared-db',
'cinder:identity-service': 'keystone:identity-service',
'cinder:amqp': 'rabbitmq-server:amqp',
'cinder:image-service': 'glance:image-service',
- 'cinder:ceph': 'ceph:client',
- 'ceph-osd:mon': 'ceph:osd'
+ 'cinder:ceph': 'ceph-mon:client',
+ 'ceph-osd:mon': 'ceph-mon:osd'
}
super(CephOsdBasicDeployment, self)._add_relations(relations)
@@ -86,9 +86,6 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
'auth-supported': 'none',
'fsid': '6547bd3e-1397-11e2-82e5-53567c8d32dc',
'monitor-secret': 'AQCXrnZQwI7KGBAAiPofmKEXKxu5bUzoYLVkbQ==',
- 'osd-reformat': 'yes',
- 'ephemeral-unmount': '/mnt',
- 'osd-devices': '/dev/vdb /srv/ceph'
}
# Include a non-existent device as osd-devices is a whitelist,
@@ -102,7 +99,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
configs = {'keystone': keystone_config,
'mysql': mysql_config,
'cinder': cinder_config,
- 'ceph': ceph_config,
+ 'ceph-mon': ceph_config,
'ceph-osd': ceph_osd_config}
super(CephOsdBasicDeployment, self)._configure_services(configs)
@@ -115,10 +112,12 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
self.nova_sentry = self.d.sentry.unit['nova-compute/0']
self.glance_sentry = self.d.sentry.unit['glance/0']
self.cinder_sentry = self.d.sentry.unit['cinder/0']
- self.ceph0_sentry = self.d.sentry.unit['ceph/0']
- self.ceph1_sentry = self.d.sentry.unit['ceph/1']
- self.ceph2_sentry = self.d.sentry.unit['ceph/2']
+ self.ceph0_sentry = self.d.sentry.unit['ceph-mon/0']
+ self.ceph1_sentry = self.d.sentry.unit['ceph-mon/1']
+ self.ceph2_sentry = self.d.sentry.unit['ceph-mon/2']
self.ceph_osd_sentry = self.d.sentry.unit['ceph-osd/0']
+ self.ceph_osd1_sentry = self.d.sentry.unit['ceph-osd/1']
+ self.ceph_osd2_sentry = self.d.sentry.unit['ceph-osd/2']
u.log.debug('openstack release val: {}'.format(
self._get_openstack_release()))
u.log.debug('openstack release str: {}'.format(
@@ -177,7 +176,6 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
# Process name and quantity of processes to expect on each unit
ceph_processes = {
'ceph-mon': 1,
- 'ceph-osd': 2
}
# Units with process names and PID quantities expected
@@ -214,9 +212,6 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
ceph_services = [
'ceph-mon-all',
'ceph-mon id=`hostname`',
- 'ceph-osd-all',
- 'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(0)),
- 'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(1))
]
services[self.ceph0_sentry] = ceph_services
services[self.ceph1_sentry] = ceph_services
@@ -233,16 +228,16 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
def test_200_ceph_osd_ceph_relation(self):
"""Verify the ceph-osd to ceph relation data."""
- u.log.debug('Checking ceph-osd:ceph mon relation data...')
+ u.log.debug('Checking ceph-osd:ceph-mon relation data...')
unit = self.ceph_osd_sentry
- relation = ['mon', 'ceph:osd']
+ relation = ['mon', 'ceph-mon:osd']
expected = {
'private-address': u.valid_ip
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
- message = u.relation_error('ceph-osd to ceph', ret)
+ message = u.relation_error('ceph-osd to ceph-mon', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_201_ceph0_to_ceph_osd_relation(self):
diff --git a/unit_tests/test_upgrade_roll.py b/unit_tests/test_upgrade_roll.py
new file mode 100644
index 00000000..840e247c
--- /dev/null
+++ b/unit_tests/test_upgrade_roll.py
@@ -0,0 +1,157 @@
+import time
+
+__author__ = 'chris'
+from mock import patch, call, MagicMock
+import sys
+
+sys.path.append('/home/chris/repos/ceph-osd/hooks')
+
+from ceph import CrushLocation
+
+import test_utils
+import ceph_hooks
+
+TO_PATCH = [
+ 'apt_install',
+ 'apt_update',
+ 'add_source',
+ 'config',
+ 'ceph',
+ 'get_conf',
+ 'hookenv',
+ 'host',
+ 'log',
+ 'service_start',
+ 'service_stop',
+ 'socket',
+ 'status_set',
+]
+
+
+def config_side_effect(*args):
+ if args[0] == 'source':
+ return 'cloud:trusty-kilo'
+ elif args[0] == 'key':
+ return 'key'
+ elif args[0] == 'release-version':
+ return 'cloud:trusty-kilo'
+
+
+previous_node_start_time = time.time() - (9 * 60)
+
+
+def monitor_key_side_effect(*args):
+ if args[1] == \
+ 'ip-192-168-1-2_done':
+ return False
+ elif args[1] == \
+ 'ip-192-168-1-2_start':
+ # Return that the previous node started 9 minutes ago
+ return previous_node_start_time
+
+
+class UpgradeRollingTestCase(test_utils.CharmTestCase):
+ def setUp(self):
+ super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH)
+
+ @patch('ceph_hooks.roll_osd_cluster')
+ def test_check_for_upgrade(self, roll_osd_cluster):
+ self.host.lsb_release.return_value = {
+ 'DISTRIB_CODENAME': 'trusty',
+ }
+ previous_mock = MagicMock().return_value
+ previous_mock.previous.return_value = "cloud:trusty-juno"
+ self.hookenv.config.side_effect = [previous_mock,
+ config_side_effect('source')]
+ ceph_hooks.check_for_upgrade()
+
+ roll_osd_cluster.assert_called_with('cloud:trusty-kilo')
+
+ @patch('ceph_hooks.upgrade_osd')
+ @patch('ceph_hooks.monitor_key_set')
+ def test_lock_and_roll(self, monitor_key_set, upgrade_osd):
+ monitor_key_set.monitor_key_set.return_value = None
+ ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2')
+ upgrade_osd.assert_called_once_with()
+
+ def test_upgrade_osd(self):
+ self.config.side_effect = config_side_effect
+ self.ceph.get_version.return_value = "0.80"
+ self.ceph.systemd.return_value = False
+ ceph_hooks.upgrade_osd()
+ self.service_stop.assert_called_with('ceph-osd-all')
+ self.service_start.assert_called_with('ceph-osd-all')
+ self.status_set.assert_has_calls([
+ call('maintenance', 'Upgrading osd'),
+ ])
+
+ @patch('ceph_hooks.lock_and_roll')
+ @patch('ceph_hooks.get_upgrade_position')
+ def test_roll_osd_cluster_first(self,
+ get_upgrade_position,
+ lock_and_roll):
+ self.socket.gethostname.return_value = "ip-192-168-1-2"
+ self.ceph.get_osd_tree.return_value = ""
+ get_upgrade_position.return_value = 0
+ ceph_hooks.roll_osd_cluster('0.94.1')
+ lock_and_roll.assert_called_with(my_name="ip-192-168-1-2")
+
+ @patch('ceph_hooks.lock_and_roll')
+ @patch('ceph_hooks.get_upgrade_position')
+ @patch('ceph_hooks.wait_on_previous_node')
+ def test_roll_osd_cluster_second(self,
+ wait_on_previous_node,
+ get_upgrade_position,
+ lock_and_roll):
+ wait_on_previous_node.return_value = None
+ self.socket.gethostname.return_value = "ip-192-168-1-3"
+ self.ceph.get_osd_tree.return_value = [
+ CrushLocation(
+ name="ip-192-168-1-2",
+ identifier='a',
+ host='host-a',
+ rack='rack-a',
+ row='row-a',
+ datacenter='dc-1',
+ chassis='chassis-a',
+ root='ceph'),
+ CrushLocation(
+ name="ip-192-168-1-3",
+ identifier='a',
+ host='host-b',
+ rack='rack-a',
+ row='row-a',
+ datacenter='dc-1',
+ chassis='chassis-a',
+ root='ceph')
+ ]
+ get_upgrade_position.return_value = 1
+ ceph_hooks.roll_osd_cluster('0.94.1')
+ self.status_set.assert_called_with(
+ 'blocked',
+ 'Waiting on ip-192-168-1-2 to finish upgrading')
+ lock_and_roll.assert_called_with(my_name="ip-192-168-1-3")
+
+ @patch('ceph_hooks.monitor_key_get')
+ @patch('ceph_hooks.monitor_key_exists')
+ def test_wait_on_previous_node(self,
+ monitor_key_exists,
+ monitor_key_get):
+ monitor_key_get.side_effect = monitor_key_side_effect
+ monitor_key_exists.return_value = False
+
+ ceph_hooks.wait_on_previous_node("ip-192-168-1-2")
+
+ # Make sure we checked to see if the previous node started
+ monitor_key_get.assert_has_calls(
+ [call('osd-upgrade', 'ip-192-168-1-2_start')]
+ )
+ # Make sure we checked to see if the previous node was finished
+ monitor_key_exists.assert_has_calls(
+ [call('osd-upgrade', 'ip-192-168-1-2_done')]
+ )
+ # Make sure we waited at last once before proceeding
+ self.log.assert_has_calls(
+ [call('Previous node is: ip-192-168-1-2')],
+ [call('ip-192-168-1-2 is not finished. Waiting')],
+ )