diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml index cb5cbac0..c8c54766 100644 --- a/charm-helpers-hooks.yaml +++ b/charm-helpers-hooks.yaml @@ -5,7 +5,6 @@ include: - cli - fetch - contrib.storage.linux: - - ceph - utils - contrib.openstack.alternatives - contrib.network.ip diff --git a/hooks/ceph.py b/hooks/ceph.py index 0b23979b..51b06ac8 100644 --- a/hooks/ceph.py +++ b/hooks/ceph.py @@ -19,10 +19,11 @@ from charmhelpers.cli.host import mounts from charmhelpers.core.host import ( mkdir, chownr, + service_restart, cmp_pkgrevno, lsb_release, - service_stop, - service_restart) + service_stop +) from charmhelpers.core.hookenv import ( log, ERROR, @@ -57,112 +58,6 @@ def ceph_user(): return "root" -class CrushLocation(object): - def __init__(self, - name, - identifier, - host, - rack, - row, - datacenter, - chassis, - root): - self.name = name - self.identifier = identifier - self.host = host - self.rack = rack - self.row = row - self.datacenter = datacenter - self.chassis = chassis - self.root = root - - def __str__(self): - return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \ - "chassis :{} root: {}".format(self.name, self.identifier, - self.host, self.rack, self.row, - self.datacenter, self.chassis, - self.root) - - def __eq__(self, other): - return not self.name < other.name and not other.name < self.name - - def __ne__(self, other): - return self.name < other.name or other.name < self.name - - def __gt__(self, other): - return self.name > other.name - - def __ge__(self, other): - return not self.name < other.name - - def __le__(self, other): - return self.name < other.name - - -def get_osd_tree(service): - """ - Returns the current osd map in JSON. - :return: List. :raise: ValueError if the monmap fails to parse. - Also raises CalledProcessError if our ceph command fails - """ - try: - tree = subprocess.check_output( - ['ceph', '--id', service, - 'osd', 'tree', '--format=json']) - try: - json_tree = json.loads(tree) - crush_list = [] - # Make sure children are present in the json - if not json_tree['nodes']: - return None - child_ids = json_tree['nodes'][0]['children'] - for child in json_tree['nodes']: - if child['id'] in child_ids: - crush_list.append( - CrushLocation( - name=child.get('name'), - identifier=child['id'], - host=child.get('host'), - rack=child.get('rack'), - row=child.get('row'), - datacenter=child.get('datacenter'), - chassis=child.get('chassis'), - root=child.get('root') - ) - ) - return crush_list - except ValueError as v: - log("Unable to parse ceph tree json: {}. Error: {}".format( - tree, v.message)) - raise - except subprocess.CalledProcessError as e: - log("ceph osd tree command failed with message: {}".format( - e.message)) - raise - - -def get_local_osd_ids(): - """ - This will list the /var/lib/ceph/osd/* directories and try - to split the ID off of the directory name and return it in - a list - - :return: list. A list of osd identifiers :raise: OSError if - something goes wrong with listing the directory. - """ - osd_ids = [] - osd_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'osd') - if os.path.exists(osd_path): - try: - dirs = os.listdir(osd_path) - for osd_dir in dirs: - osd_id = osd_dir.split('-')[1] - osd_ids.append(osd_id) - except OSError: - raise - return osd_ids - - def get_version(): '''Derive Ceph release from an installed package.''' import apt_pkg as apt @@ -413,7 +308,6 @@ def rescan_osd_devices(): _bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring" -_upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring" def is_bootstrapped(): @@ -439,21 +333,6 @@ def import_osd_bootstrap_key(key): ] subprocess.check_call(cmd) - -def import_osd_upgrade_key(key): - if not os.path.exists(_upgrade_keyring): - cmd = [ - "sudo", - "-u", - ceph_user(), - 'ceph-authtool', - _upgrade_keyring, - '--create-keyring', - '--name=client.osd-upgrade', - '--add-key={}'.format(key) - ] - subprocess.check_call(cmd) - # OSD caps taken from ceph-create-keys _osd_bootstrap_caps = { 'mon': [ @@ -620,7 +499,7 @@ def update_monfs(): def maybe_zap_journal(journal_dev): - if is_osd_disk(journal_dev): + if (is_osd_disk(journal_dev)): log('Looks like {} is already an OSD data' ' or journal, skipping.'.format(journal_dev)) return @@ -664,7 +543,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False, log('Path {} is not a block device - bailing'.format(dev)) return - if is_osd_disk(dev) and not reformat_osd: + if (is_osd_disk(dev) and not reformat_osd): log('Looks like {} is already an' ' OSD data or journal, skipping.'.format(dev)) return @@ -738,7 +617,7 @@ def filesystem_mounted(fs): def get_running_osds(): - """Returns a list of the pids of the current running OSD daemons""" + '''Returns a list of the pids of the current running OSD daemons''' cmd = ['pgrep', 'ceph-osd'] try: result = subprocess.check_output(cmd) diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index f31bbf52..912abc11 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -9,16 +9,12 @@ import glob import os -import random import shutil -import subprocess import sys import tempfile import socket -import time import ceph -from charmhelpers.core import hookenv from charmhelpers.core.hookenv import ( log, ERROR, @@ -35,8 +31,8 @@ from charmhelpers.core.hookenv import ( from charmhelpers.core.host import ( umount, mkdir, - cmp_pkgrevno, - service_stop, service_start) + cmp_pkgrevno +) from charmhelpers.fetch import ( add_source, apt_install, @@ -44,216 +40,24 @@ from charmhelpers.fetch import ( filter_installed_packages, ) from charmhelpers.core.sysctl import create as create_sysctl -from charmhelpers.core import host from utils import ( get_host_ip, get_networks, assert_charm_supports_ipv6, - render_template) + render_template, +) from charmhelpers.contrib.openstack.alternatives import install_alternative from charmhelpers.contrib.network.ip import ( get_ipv6_addr, format_ipv6_addr, ) -from charmhelpers.contrib.storage.linux.ceph import ( - monitor_key_set, - monitor_key_exists, - monitor_key_get) + from charmhelpers.contrib.charmsupport import nrpe hooks = Hooks() -# A dict of valid ceph upgrade paths. Mapping is old -> new -upgrade_paths = { - 'cloud:trusty-juno': 'cloud:trusty-kilo', - 'cloud:trusty-kilo': 'cloud:trusty-liberty', - 'cloud:trusty-liberty': 'cloud:trusty-mitaka', -} - - -def pretty_print_upgrade_paths(): - lines = [] - for key, value in upgrade_paths.iteritems(): - lines.append("{} -> {}".format(key, value)) - return lines - - -def check_for_upgrade(): - release_info = host.lsb_release() - if not release_info['DISTRIB_CODENAME'] == 'trusty': - log("Invalid upgrade path from {}. Only trusty is currently " - "supported".format(release_info['DISTRIB_CODENAME'])) - return - - c = hookenv.config() - old_version = c.previous('source') - log('old_version: {}'.format(old_version)) - # Strip all whitespace - new_version = hookenv.config('source') - if new_version: - # replace all whitespace - new_version = new_version.replace(' ', '') - log('new_version: {}'.format(new_version)) - - if old_version in upgrade_paths: - if new_version == upgrade_paths[old_version]: - log("{} to {} is a valid upgrade path. Proceeding.".format( - old_version, new_version)) - roll_osd_cluster(new_version) - else: - # Log a helpful error message - log("Invalid upgrade path from {} to {}. " - "Valid paths are: {}".format(old_version, - new_version, - pretty_print_upgrade_paths())) - - -def lock_and_roll(my_name): - start_timestamp = time.time() - - log('monitor_key_set {}_start {}'.format(my_name, start_timestamp)) - monitor_key_set('osd-upgrade', "{}_start".format(my_name), start_timestamp) - log("Rolling") - # This should be quick - upgrade_osd() - log("Done") - - stop_timestamp = time.time() - # Set a key to inform others I am finished - log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp)) - monitor_key_set('osd-upgrade', "{}_done".format(my_name), stop_timestamp) - - -def wait_on_previous_node(previous_node): - log("Previous node is: {}".format(previous_node)) - - previous_node_finished = monitor_key_exists( - 'osd-upgrade', - "{}_done".format(previous_node)) - - while previous_node_finished is False: - log("{} is not finished. Waiting".format(previous_node)) - # Has this node been trying to upgrade for longer than - # 10 minutes? - # If so then move on and consider that node dead. - - # NOTE: This assumes the clusters clocks are somewhat accurate - # If the hosts clock is really far off it may cause it to skip - # the previous node even though it shouldn't. - current_timestamp = time.time() - previous_node_start_time = monitor_key_get( - 'osd-upgrade', - "{}_start".format(previous_node)) - if (current_timestamp - (10 * 60)) > previous_node_start_time: - # Previous node is probably dead. Lets move on - if previous_node_start_time is not None: - log( - "Waited 10 mins on node {}. current time: {} > " - "previous node start time: {} Moving on".format( - previous_node, - (current_timestamp - (10 * 60)), - previous_node_start_time)) - return - else: - # I have to wait. Sleep a random amount of time and then - # check if I can lock,upgrade and roll. - wait_time = random.randrange(5, 30) - log('waiting for {} seconds'.format(wait_time)) - time.sleep(wait_time) - previous_node_finished = monitor_key_exists( - 'osd-upgrade', - "{}_done".format(previous_node)) - - -def get_upgrade_position(osd_sorted_list, match_name): - for index, item in enumerate(osd_sorted_list): - if item.name == match_name: - return index - return None - - -# Edge cases: -# 1. Previous node dies on upgrade, can we retry? -# 2. This assumes that the osd failure domain is not set to osd. -# It rolls an entire server at a time. -def roll_osd_cluster(new_version): - """ - This is tricky to get right so here's what we're going to do. - There's 2 possible cases: Either I'm first in line or not. - If I'm not first in line I'll wait a random time between 5-30 seconds - and test to see if the previous osd is upgraded yet. - - TODO: If you're not in the same failure domain it's safe to upgrade - 1. Examine all pools and adopt the most strict failure domain policy - Example: Pool 1: Failure domain = rack - Pool 2: Failure domain = host - Pool 3: Failure domain = row - - outcome: Failure domain = host - """ - log('roll_osd_cluster called with {}'.format(new_version)) - my_name = socket.gethostname() - osd_tree = ceph.get_osd_tree(service='osd-upgrade') - # A sorted list of osd unit names - osd_sorted_list = sorted(osd_tree) - log("osd_sorted_list: {}".format(osd_sorted_list)) - - try: - position = get_upgrade_position(osd_sorted_list, my_name) - log("upgrade position: {}".format(position)) - if position == 0: - # I'm first! Roll - # First set a key to inform others I'm about to roll - lock_and_roll(my_name=my_name) - else: - # Check if the previous node has finished - status_set('blocked', - 'Waiting on {} to finish upgrading'.format( - osd_sorted_list[position - 1].name)) - wait_on_previous_node( - previous_node=osd_sorted_list[position - 1].name) - lock_and_roll(my_name=my_name) - except ValueError: - log("Failed to find name {} in list {}".format( - my_name, osd_sorted_list)) - status_set('blocked', 'failed to upgrade osd') - - -def upgrade_osd(): - current_version = ceph.get_version() - status_set("maintenance", "Upgrading osd") - log("Current ceph version is {}".format(current_version)) - new_version = config('release-version') - log("Upgrading to: {}".format(new_version)) - - try: - add_source(config('source'), config('key')) - apt_update(fatal=True) - except subprocess.CalledProcessError as err: - log("Adding the ceph source failed with message: {}".format( - err.message)) - status_set("blocked", "Upgrade to {} failed".format(new_version)) - sys.exit(1) - try: - if ceph.systemd(): - for osd_id in ceph.get_local_osd_ids(): - service_stop('ceph-osd@{}'.format(osd_id)) - else: - service_stop('ceph-osd-all') - apt_install(packages=ceph.PACKAGES, fatal=True) - if ceph.systemd(): - for osd_id in ceph.get_local_osd_ids(): - service_start('ceph-osd@{}'.format(osd_id)) - else: - service_start('ceph-osd-all') - except subprocess.CalledProcessError as err: - log("Stopping ceph and upgrading packages failed " - "with message: {}".format(err.message)) - status_set("blocked", "Upgrade to {} failed".format(new_version)) - sys.exit(1) - def install_upstart_scripts(): # Only install upstart configurations for older versions @@ -320,7 +124,6 @@ def emit_cephconf(): install_alternative('ceph.conf', '/etc/ceph/ceph.conf', charm_ceph_conf, 90) - JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped' @@ -355,9 +158,6 @@ def check_overlap(journaldevs, datadevs): @hooks.hook('config-changed') def config_changed(): - # Check if an upgrade was requested - check_for_upgrade() - # Pre-flight checks if config('osd-format') not in ceph.DISK_FORMATS: log('Invalid OSD disk format configuration specified', level=ERROR) @@ -371,7 +171,7 @@ def config_changed(): create_sysctl(sysctl_dict, '/etc/sysctl.d/50-ceph-osd-charm.conf') e_mountpoint = config('ephemeral-unmount') - if e_mountpoint and ceph.filesystem_mounted(e_mountpoint): + if (e_mountpoint and ceph.filesystem_mounted(e_mountpoint)): umount(e_mountpoint) prepare_disks_and_activate() @@ -401,14 +201,8 @@ def get_mon_hosts(): hosts = [] for relid in relation_ids('mon'): for unit in related_units(relid): - addr = \ - relation_get('ceph-public-address', - unit, - relid) or get_host_ip( - relation_get( - 'private-address', - unit, - relid)) + addr = relation_get('ceph-public-address', unit, relid) or \ + get_host_ip(relation_get('private-address', unit, relid)) if addr: hosts.append('{}:6789'.format(format_ipv6_addr(addr) or addr)) @@ -464,12 +258,10 @@ def get_journal_devices(): 'mon-relation-departed') def mon_relation(): bootstrap_key = relation_get('osd_bootstrap_key') - upgrade_key = relation_get('osd_upgrade_key') if get_fsid() and get_auth() and bootstrap_key: log('mon has provided conf- scanning disks') emit_cephconf() ceph.import_osd_bootstrap_key(bootstrap_key) - ceph.import_osd_upgrade_key(upgrade_key) prepare_disks_and_activate() else: log('mon cluster has not yet provided conf') diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py deleted file mode 100644 index f4582545..00000000 --- a/hooks/charmhelpers/contrib/storage/linux/ceph.py +++ /dev/null @@ -1,1195 +0,0 @@ -# Copyright 2014-2015 Canonical Limited. -# -# This file is part of charm-helpers. -# -# charm-helpers is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License version 3 as -# published by the Free Software Foundation. -# -# charm-helpers is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with charm-helpers. If not, see . - -# -# Copyright 2012 Canonical Ltd. -# -# This file is sourced from lp:openstack-charm-helpers -# -# Authors: -# James Page -# Adam Gandelman -# -import bisect -import errno -import hashlib -import six - -import os -import shutil -import json -import time -import uuid - -from subprocess import ( - check_call, - check_output, - CalledProcessError, -) -from charmhelpers.core.hookenv import ( - local_unit, - relation_get, - relation_ids, - relation_set, - related_units, - log, - DEBUG, - INFO, - WARNING, - ERROR, -) -from charmhelpers.core.host import ( - mount, - mounts, - service_start, - service_stop, - service_running, - umount, -) -from charmhelpers.fetch import ( - apt_install, -) - -from charmhelpers.core.kernel import modprobe - -KEYRING = '/etc/ceph/ceph.client.{}.keyring' -KEYFILE = '/etc/ceph/ceph.client.{}.key' - -CEPH_CONF = """[global] -auth supported = {auth} -keyring = {keyring} -mon host = {mon_hosts} -log to syslog = {use_syslog} -err to syslog = {use_syslog} -clog to syslog = {use_syslog} -""" -# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs) -powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608] - - -def validator(value, valid_type, valid_range=None): - """ - Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values - Example input: - validator(value=1, - valid_type=int, - valid_range=[0, 2]) - This says I'm testing value=1. It must be an int inclusive in [0,2] - - :param value: The value to validate - :param valid_type: The type that value should be. - :param valid_range: A range of values that value can assume. - :return: - """ - assert isinstance(value, valid_type), "{} is not a {}".format( - value, - valid_type) - if valid_range is not None: - assert isinstance(valid_range, list), \ - "valid_range must be a list, was given {}".format(valid_range) - # If we're dealing with strings - if valid_type is six.string_types: - assert value in valid_range, \ - "{} is not in the list {}".format(value, valid_range) - # Integer, float should have a min and max - else: - if len(valid_range) != 2: - raise ValueError( - "Invalid valid_range list of {} for {}. " - "List must be [min,max]".format(valid_range, value)) - assert value >= valid_range[0], \ - "{} is less than minimum allowed value of {}".format( - value, valid_range[0]) - assert value <= valid_range[1], \ - "{} is greater than maximum allowed value of {}".format( - value, valid_range[1]) - - -class PoolCreationError(Exception): - """ - A custom error to inform the caller that a pool creation failed. Provides an error message - """ - - def __init__(self, message): - super(PoolCreationError, self).__init__(message) - - -class Pool(object): - """ - An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool. - Do not call create() on this base class as it will not do anything. Instantiate a child class and call create(). - """ - - def __init__(self, service, name): - self.service = service - self.name = name - - # Create the pool if it doesn't exist already - # To be implemented by subclasses - def create(self): - pass - - def add_cache_tier(self, cache_pool, mode): - """ - Adds a new cache tier to an existing pool. - :param cache_pool: six.string_types. The cache tier pool name to add. - :param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"] - :return: None - """ - # Check the input types and values - validator(value=cache_pool, valid_type=six.string_types) - validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"]) - - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom']) - - def remove_cache_tier(self, cache_pool): - """ - Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete. - :param cache_pool: six.string_types. The cache tier pool name to remove. - :return: None - """ - # read-only is easy, writeback is much harder - mode = get_cache_mode(self.service, cache_pool) - if mode == 'readonly': - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) - - elif mode == 'writeback': - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward']) - # Flush the cache and wait for it to return - check_call(['ceph', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) - - def get_pgs(self, pool_size): - """ - :param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for - erasure coded pools - :return: int. The number of pgs to use. - """ - validator(value=pool_size, valid_type=int) - osd_list = get_osds(self.service) - if not osd_list: - # NOTE(james-page): Default to 200 for older ceph versions - # which don't support OSD query from cli - return 200 - - osd_list_length = len(osd_list) - # Calculate based on Ceph best practices - if osd_list_length < 5: - return 128 - elif 5 < osd_list_length < 10: - return 512 - elif 10 < osd_list_length < 50: - return 4096 - else: - estimate = (osd_list_length * 100) / pool_size - # Return the next nearest power of 2 - index = bisect.bisect_right(powers_of_two, estimate) - return powers_of_two[index] - - -class ReplicatedPool(Pool): - def __init__(self, service, name, pg_num=None, replicas=2): - super(ReplicatedPool, self).__init__(service=service, name=name) - self.replicas = replicas - if pg_num is None: - self.pg_num = self.get_pgs(self.replicas) - else: - self.pg_num = pg_num - - def create(self): - if not pool_exists(self.service, self.name): - # Create it - cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', - self.name, str(self.pg_num)] - try: - check_call(cmd) - except CalledProcessError: - raise - - -# Default jerasure erasure coded pool -class ErasurePool(Pool): - def __init__(self, service, name, erasure_code_profile="default"): - super(ErasurePool, self).__init__(service=service, name=name) - self.erasure_code_profile = erasure_code_profile - - def create(self): - if not pool_exists(self.service, self.name): - # Try to find the erasure profile information so we can properly size the pgs - erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile) - - # Check for errors - if erasure_profile is None: - log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile), - level=ERROR) - raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile)) - if 'k' not in erasure_profile or 'm' not in erasure_profile: - # Error - log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile), - level=ERROR) - raise PoolCreationError( - message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile)) - - pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m'])) - # Create it - cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs), - 'erasure', self.erasure_code_profile] - try: - check_call(cmd) - except CalledProcessError: - raise - - """Get an existing erasure code profile if it already exists. - Returns json formatted output""" - - -def get_mon_map(service): - """ - Returns the current monitor map. - :param service: six.string_types. The Ceph user name to run the command under - :return: json string. :raise: ValueError if the monmap fails to parse. - Also raises CalledProcessError if our ceph command fails - """ - try: - mon_status = check_output( - ['ceph', '--id', service, - 'mon_status', '--format=json']) - try: - return json.loads(mon_status) - except ValueError as v: - log("Unable to parse mon_status json: {}. Error: {}".format( - mon_status, v.message)) - raise - except CalledProcessError as e: - log("mon_status command failed with message: {}".format( - e.message)) - raise - - -def hash_monitor_names(service): - """ - Uses the get_mon_map() function to get information about the monitor - cluster. - Hash the name of each monitor. Return a sorted list of monitor hashes - in an ascending order. - :param service: six.string_types. The Ceph user name to run the command under - :rtype : dict. json dict of monitor name, ip address and rank - example: { - 'name': 'ip-172-31-13-165', - 'rank': 0, - 'addr': '172.31.13.165:6789/0'} - """ - try: - hash_list = [] - monitor_list = get_mon_map(service=service) - if monitor_list['monmap']['mons']: - for mon in monitor_list['monmap']['mons']: - hash_list.append( - hashlib.sha224(mon['name'].encode('utf-8')).hexdigest()) - return sorted(hash_list) - else: - return None - except (ValueError, CalledProcessError): - raise - - -def monitor_key_delete(service, key): - """ - Delete a key and value pair from the monitor cluster - :param service: six.string_types. The Ceph user name to run the command under - Deletes a key value pair on the monitor cluster. - :param key: six.string_types. The key to delete. - """ - try: - check_output( - ['ceph', '--id', service, - 'config-key', 'del', str(key)]) - except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) - raise - - -def monitor_key_set(service, key, value): - """ - Sets a key value pair on the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to set. - :param value: The value to set. This will be converted to a string - before setting - """ - try: - check_output( - ['ceph', '--id', service, - 'config-key', 'put', str(key), str(value)]) - except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) - raise - - -def monitor_key_get(service, key): - """ - Gets the value of an existing key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for. - :return: Returns the value of that key or None if not found. - """ - try: - output = check_output( - ['ceph', '--id', service, - 'config-key', 'get', str(key)]) - return output - except CalledProcessError as e: - log("Monitor config-key get failed with message: {}".format( - e.output)) - return None - - -def monitor_key_exists(service, key): - """ - Searches for the existence of a key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for - :return: Returns True if the key exists, False if not and raises an - exception if an unknown error occurs. :raise: CalledProcessError if - an unknown error occurs - """ - try: - check_call( - ['ceph', '--id', service, - 'config-key', 'exists', str(key)]) - # I can return true here regardless because Ceph returns - # ENOENT if the key wasn't found - return True - except CalledProcessError as e: - if e.returncode == errno.ENOENT: - return False - else: - log("Unknown error from ceph config-get exists: {} {}".format( - e.returncode, e.output)) - raise - - -def get_erasure_profile(service, name): - """ - :param service: six.string_types. The Ceph user name to run the command under - :param name: - :return: - """ - try: - out = check_output(['ceph', '--id', service, - 'osd', 'erasure-code-profile', 'get', - name, '--format=json']) - return json.loads(out) - except (CalledProcessError, OSError, ValueError): - return None - - -def pool_set(service, pool_name, key, value): - """ - Sets a value for a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param key: six.string_types - :param value: - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value] - try: - check_call(cmd) - except CalledProcessError: - raise - - -def snapshot_pool(service, pool_name, snapshot_name): - """ - Snapshots a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise - - -def remove_pool_snapshot(service, pool_name, snapshot_name): - """ - Remove a snapshot from a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise - - -# max_bytes should be an int or long -def set_pool_quota(service, pool_name, max_bytes): - """ - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param max_bytes: int or long - :return: None. Can raise CalledProcessError - """ - # Set a byte quota on a RADOS pool in ceph. - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, - 'max_bytes', str(max_bytes)] - try: - check_call(cmd) - except CalledProcessError: - raise - - -def remove_pool_quota(service, pool_name): - """ - Set a byte quota on a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] - try: - check_call(cmd) - except CalledProcessError: - raise - - -def remove_erasure_profile(service, profile_name): - """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm', - profile_name] - try: - check_call(cmd) - except CalledProcessError: - raise - - -def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure', - failure_domain='host', - data_chunks=2, coding_chunks=1, - locality=None, durability_estimator=None): - """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :param erasure_plugin_name: six.string_types - :param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', - 'room', 'root', 'row']) - :param data_chunks: int - :param coding_chunks: int - :param locality: int - :param durability_estimator: int - :return: None. Can raise CalledProcessError - """ - # Ensure this failure_domain is allowed by Ceph - validator(failure_domain, six.string_types, - ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row']) - - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name, - 'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks), - 'ruleset_failure_domain=' + failure_domain] - if locality is not None and durability_estimator is not None: - raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.") - - # Add plugin specific information - if locality is not None: - # For local erasure codes - cmd.append('l=' + str(locality)) - if durability_estimator is not None: - # For Shec erasure codes - cmd.append('c=' + str(durability_estimator)) - - if erasure_profile_exists(service, profile_name): - cmd.append('--force') - - try: - check_call(cmd) - except CalledProcessError: - raise - - -def rename_pool(service, old_name, new_name): - """ - Rename a Ceph pool from old_name to new_name - :param service: six.string_types. The Ceph user name to run the command under - :param old_name: six.string_types - :param new_name: six.string_types - :return: None - """ - validator(value=old_name, valid_type=six.string_types) - validator(value=new_name, valid_type=six.string_types) - - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name] - check_call(cmd) - - -def erasure_profile_exists(service, name): - """ - Check to see if an Erasure code profile already exists. - :param service: six.string_types. The Ceph user name to run the command under - :param name: six.string_types - :return: int or None - """ - validator(value=name, valid_type=six.string_types) - try: - check_call(['ceph', '--id', service, - 'osd', 'erasure-code-profile', 'get', - name]) - return True - except CalledProcessError: - return False - - -def get_cache_mode(service, pool_name): - """ - Find the current caching mode of the pool_name given. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: int or None - """ - validator(value=service, valid_type=six.string_types) - validator(value=pool_name, valid_type=six.string_types) - out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json']) - try: - osd_json = json.loads(out) - for pool in osd_json['pools']: - if pool['pool_name'] == pool_name: - return pool['cache_mode'] - return None - except ValueError: - raise - - -def pool_exists(service, name): - """Check to see if a RADOS pool already exists.""" - try: - out = check_output(['rados', '--id', service, - 'lspools']).decode('UTF-8') - except CalledProcessError: - return False - - return name in out - - -def get_osds(service): - """Return a list of all Ceph Object Storage Daemons currently in the - cluster. - """ - version = ceph_version() - if version and version >= '0.56': - return json.loads(check_output(['ceph', '--id', service, - 'osd', 'ls', - '--format=json']).decode('UTF-8')) - - return None - - -def install(): - """Basic Ceph client installation.""" - ceph_dir = "/etc/ceph" - if not os.path.exists(ceph_dir): - os.mkdir(ceph_dir) - - apt_install('ceph-common', fatal=True) - - -def rbd_exists(service, pool, rbd_img): - """Check to see if a RADOS block device exists.""" - try: - out = check_output(['rbd', 'list', '--id', - service, '--pool', pool]).decode('UTF-8') - except CalledProcessError: - return False - - return rbd_img in out - - -def create_rbd_image(service, pool, image, sizemb): - """Create a new RADOS block device.""" - cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service, - '--pool', pool] - check_call(cmd) - - -def update_pool(client, pool, settings): - cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool] - for k, v in six.iteritems(settings): - cmd.append(k) - cmd.append(v) - - check_call(cmd) - - -def create_pool(service, name, replicas=3, pg_num=None): - """Create a new RADOS pool.""" - if pool_exists(service, name): - log("Ceph pool {} already exists, skipping creation".format(name), - level=WARNING) - return - - if not pg_num: - # Calculate the number of placement groups based - # on upstream recommended best practices. - osds = get_osds(service) - if osds: - pg_num = (len(osds) * 100 // replicas) - else: - # NOTE(james-page): Default to 200 for older ceph versions - # which don't support OSD query from cli - pg_num = 200 - - cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)] - check_call(cmd) - - update_pool(service, name, settings={'size': str(replicas)}) - - -def delete_pool(service, name): - """Delete a RADOS pool from ceph.""" - cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name, - '--yes-i-really-really-mean-it'] - check_call(cmd) - - -def _keyfile_path(service): - return KEYFILE.format(service) - - -def _keyring_path(service): - return KEYRING.format(service) - - -def create_keyring(service, key): - """Create a new Ceph keyring containing key.""" - keyring = _keyring_path(service) - if os.path.exists(keyring): - log('Ceph keyring exists at %s.' % keyring, level=WARNING) - return - - cmd = ['ceph-authtool', keyring, '--create-keyring', - '--name=client.{}'.format(service), '--add-key={}'.format(key)] - check_call(cmd) - log('Created new ceph keyring at %s.' % keyring, level=DEBUG) - - -def delete_keyring(service): - """Delete an existing Ceph keyring.""" - keyring = _keyring_path(service) - if not os.path.exists(keyring): - log('Keyring does not exist at %s' % keyring, level=WARNING) - return - - os.remove(keyring) - log('Deleted ring at %s.' % keyring, level=INFO) - - -def create_key_file(service, key): - """Create a file containing key.""" - keyfile = _keyfile_path(service) - if os.path.exists(keyfile): - log('Keyfile exists at %s.' % keyfile, level=WARNING) - return - - with open(keyfile, 'w') as fd: - fd.write(key) - - log('Created new keyfile at %s.' % keyfile, level=INFO) - - -def get_ceph_nodes(relation='ceph'): - """Query named relation to determine current nodes.""" - hosts = [] - for r_id in relation_ids(relation): - for unit in related_units(r_id): - hosts.append(relation_get('private-address', unit=unit, rid=r_id)) - - return hosts - - -def configure(service, key, auth, use_syslog): - """Perform basic configuration of Ceph.""" - create_keyring(service, key) - create_key_file(service, key) - hosts = get_ceph_nodes() - with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: - ceph_conf.write(CEPH_CONF.format(auth=auth, - keyring=_keyring_path(service), - mon_hosts=",".join(map(str, hosts)), - use_syslog=use_syslog)) - modprobe('rbd') - - -def image_mapped(name): - """Determine whether a RADOS block device is mapped locally.""" - try: - out = check_output(['rbd', 'showmapped']).decode('UTF-8') - except CalledProcessError: - return False - - return name in out - - -def map_block_storage(service, pool, image): - """Map a RADOS block device for local use.""" - cmd = [ - 'rbd', - 'map', - '{}/{}'.format(pool, image), - '--user', - service, - '--secret', - _keyfile_path(service), - ] - check_call(cmd) - - -def filesystem_mounted(fs): - """Determine whether a filesytems is already mounted.""" - return fs in [f for f, m in mounts()] - - -def make_filesystem(blk_device, fstype='ext4', timeout=10): - """Make a new filesystem on the specified block device.""" - count = 0 - e_noent = os.errno.ENOENT - while not os.path.exists(blk_device): - if count >= timeout: - log('Gave up waiting on block device %s' % blk_device, - level=ERROR) - raise IOError(e_noent, os.strerror(e_noent), blk_device) - - log('Waiting for block device %s to appear' % blk_device, - level=DEBUG) - count += 1 - time.sleep(1) - else: - log('Formatting block device %s as filesystem %s.' % - (blk_device, fstype), level=INFO) - check_call(['mkfs', '-t', fstype, blk_device]) - - -def place_data_on_block_device(blk_device, data_src_dst): - """Migrate data in data_src_dst to blk_device and then remount.""" - # mount block device into /mnt - mount(blk_device, '/mnt') - # copy data to /mnt - copy_files(data_src_dst, '/mnt') - # umount block device - umount('/mnt') - # Grab user/group ID's from original source - _dir = os.stat(data_src_dst) - uid = _dir.st_uid - gid = _dir.st_gid - # re-mount where the data should originally be - # TODO: persist is currently a NO-OP in core.host - mount(blk_device, data_src_dst, persist=True) - # ensure original ownership of new mount. - os.chown(data_src_dst, uid, gid) - - -def copy_files(src, dst, symlinks=False, ignore=None): - """Copy files from src to dst.""" - for item in os.listdir(src): - s = os.path.join(src, item) - d = os.path.join(dst, item) - if os.path.isdir(s): - shutil.copytree(s, d, symlinks, ignore) - else: - shutil.copy2(s, d) - - -def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, - blk_device, fstype, system_services=[], - replicas=3): - """NOTE: This function must only be called from a single service unit for - the same rbd_img otherwise data loss will occur. - - Ensures given pool and RBD image exists, is mapped to a block device, - and the device is formatted and mounted at the given mount_point. - - If formatting a device for the first time, data existing at mount_point - will be migrated to the RBD device before being re-mounted. - - All services listed in system_services will be stopped prior to data - migration and restarted when complete. - """ - # Ensure pool, RBD image, RBD mappings are in place. - if not pool_exists(service, pool): - log('Creating new pool {}.'.format(pool), level=INFO) - create_pool(service, pool, replicas=replicas) - - if not rbd_exists(service, pool, rbd_img): - log('Creating RBD image ({}).'.format(rbd_img), level=INFO) - create_rbd_image(service, pool, rbd_img, sizemb) - - if not image_mapped(rbd_img): - log('Mapping RBD Image {} as a Block Device.'.format(rbd_img), - level=INFO) - map_block_storage(service, pool, rbd_img) - - # make file system - # TODO: What happens if for whatever reason this is run again and - # the data is already in the rbd device and/or is mounted?? - # When it is mounted already, it will fail to make the fs - # XXX: This is really sketchy! Need to at least add an fstab entry - # otherwise this hook will blow away existing data if its executed - # after a reboot. - if not filesystem_mounted(mount_point): - make_filesystem(blk_device, fstype) - - for svc in system_services: - if service_running(svc): - log('Stopping services {} prior to migrating data.' - .format(svc), level=DEBUG) - service_stop(svc) - - place_data_on_block_device(blk_device, mount_point) - - for svc in system_services: - log('Starting service {} after migrating data.' - .format(svc), level=DEBUG) - service_start(svc) - - -def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'): - """Ensures a ceph keyring is created for a named service and optionally - ensures user and group ownership. - - Returns False if no ceph key is available in relation state. - """ - key = None - for rid in relation_ids(relation): - for unit in related_units(rid): - key = relation_get('key', rid=rid, unit=unit) - if key: - break - - if not key: - return False - - create_keyring(service=service, key=key) - keyring = _keyring_path(service) - if user and group: - check_call(['chown', '%s.%s' % (user, group), keyring]) - - return True - - -def ceph_version(): - """Retrieve the local version of ceph.""" - if os.path.exists('/usr/bin/ceph'): - cmd = ['ceph', '-v'] - output = check_output(cmd).decode('US-ASCII') - output = output.split() - if len(output) > 3: - return output[2] - else: - return None - else: - return None - - -class CephBrokerRq(object): - """Ceph broker request. - - Multiple operations can be added to a request and sent to the Ceph broker - to be executed. - - Request is json-encoded for sending over the wire. - - The API is versioned and defaults to version 1. - """ - - def __init__(self, api_version=1, request_id=None): - self.api_version = api_version - if request_id: - self.request_id = request_id - else: - self.request_id = str(uuid.uuid1()) - self.ops = [] - - def add_op_create_pool(self, name, replica_count=3, pg_num=None): - """Adds an operation to create a pool. - - @param pg_num setting: optional setting. If not provided, this value - will be calculated by the broker based on how many OSDs are in the - cluster at the time of creation. Note that, if provided, this value - will be capped at the current available maximum. - """ - self.ops.append({'op': 'create-pool', 'name': name, - 'replicas': replica_count, 'pg_num': pg_num}) - - def set_ops(self, ops): - """Set request ops to provided value. - - Useful for injecting ops that come from a previous request - to allow comparisons to ensure validity. - """ - self.ops = ops - - @property - def request(self): - return json.dumps({'api-version': self.api_version, 'ops': self.ops, - 'request-id': self.request_id}) - - def _ops_equal(self, other): - if len(self.ops) == len(other.ops): - for req_no in range(0, len(self.ops)): - for key in ['replicas', 'name', 'op', 'pg_num']: - if self.ops[req_no].get(key) != other.ops[req_no].get(key): - return False - else: - return False - return True - - def __eq__(self, other): - if not isinstance(other, self.__class__): - return False - if self.api_version == other.api_version and \ - self._ops_equal(other): - return True - else: - return False - - def __ne__(self, other): - return not self.__eq__(other) - - -class CephBrokerRsp(object): - """Ceph broker response. - - Response is json-decoded and contents provided as methods/properties. - - The API is versioned and defaults to version 1. - """ - - def __init__(self, encoded_rsp): - self.api_version = None - self.rsp = json.loads(encoded_rsp) - - @property - def request_id(self): - return self.rsp.get('request-id') - - @property - def exit_code(self): - return self.rsp.get('exit-code') - - @property - def exit_msg(self): - return self.rsp.get('stderr') - - -# Ceph Broker Conversation: -# If a charm needs an action to be taken by ceph it can create a CephBrokerRq -# and send that request to ceph via the ceph relation. The CephBrokerRq has a -# unique id so that the client can identity which CephBrokerRsp is associated -# with the request. Ceph will also respond to each client unit individually -# creating a response key per client unit eg glance/0 will get a CephBrokerRsp -# via key broker-rsp-glance-0 -# -# To use this the charm can just do something like: -# -# from charmhelpers.contrib.storage.linux.ceph import ( -# send_request_if_needed, -# is_request_complete, -# CephBrokerRq, -# ) -# -# @hooks.hook('ceph-relation-changed') -# def ceph_changed(): -# rq = CephBrokerRq() -# rq.add_op_create_pool(name='poolname', replica_count=3) -# -# if is_request_complete(rq): -# -# else: -# send_request_if_needed(get_ceph_request()) -# -# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example -# of glance having sent a request to ceph which ceph has successfully processed -# 'ceph:8': { -# 'ceph/0': { -# 'auth': 'cephx', -# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}', -# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}', -# 'ceph-public-address': '10.5.44.103', -# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==', -# 'private-address': '10.5.44.103', -# }, -# 'glance/0': { -# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", ' -# '"ops": [{"replicas": 3, "name": "glance", ' -# '"op": "create-pool"}]}'), -# 'private-address': '10.5.44.109', -# }, -# } - -def get_previous_request(rid): - """Return the last ceph broker request sent on a given relation - - @param rid: Relation id to query for request - """ - request = None - broker_req = relation_get(attribute='broker_req', rid=rid, - unit=local_unit()) - if broker_req: - request_data = json.loads(broker_req) - request = CephBrokerRq(api_version=request_data['api-version'], - request_id=request_data['request-id']) - request.set_ops(request_data['ops']) - - return request - - -def get_request_states(request, relation='ceph'): - """Return a dict of requests per relation id with their corresponding - completion state. - - This allows a charm, which has a request for ceph, to see whether there is - an equivalent request already being processed and if so what state that - request is in. - - @param request: A CephBrokerRq object - """ - complete = [] - requests = {} - for rid in relation_ids(relation): - complete = False - previous_request = get_previous_request(rid) - if request == previous_request: - sent = True - complete = is_request_complete_for_rid(previous_request, rid) - else: - sent = False - complete = False - - requests[rid] = { - 'sent': sent, - 'complete': complete, - } - - return requests - - -def is_request_sent(request, relation='ceph'): - """Check to see if a functionally equivalent request has already been sent - - Returns True if a similair request has been sent - - @param request: A CephBrokerRq object - """ - states = get_request_states(request, relation=relation) - for rid in states.keys(): - if not states[rid]['sent']: - return False - - return True - - -def is_request_complete(request, relation='ceph'): - """Check to see if a functionally equivalent request has already been - completed - - Returns True if a similair request has been completed - - @param request: A CephBrokerRq object - """ - states = get_request_states(request, relation=relation) - for rid in states.keys(): - if not states[rid]['complete']: - return False - - return True - - -def is_request_complete_for_rid(request, rid): - """Check if a given request has been completed on the given relation - - @param request: A CephBrokerRq object - @param rid: Relation ID - """ - broker_key = get_broker_rsp_key() - for unit in related_units(rid): - rdata = relation_get(rid=rid, unit=unit) - if rdata.get(broker_key): - rsp = CephBrokerRsp(rdata.get(broker_key)) - if rsp.request_id == request.request_id: - if not rsp.exit_code: - return True - else: - # The remote unit sent no reply targeted at this unit so either the - # remote ceph cluster does not support unit targeted replies or it - # has not processed our request yet. - if rdata.get('broker_rsp'): - request_data = json.loads(rdata['broker_rsp']) - if request_data.get('request-id'): - log('Ignoring legacy broker_rsp without unit key as remote ' - 'service supports unit specific replies', level=DEBUG) - else: - log('Using legacy broker_rsp as remote service does not ' - 'supports unit specific replies', level=DEBUG) - rsp = CephBrokerRsp(rdata['broker_rsp']) - if not rsp.exit_code: - return True - - return False - - -def get_broker_rsp_key(): - """Return broker response key for this unit - - This is the key that ceph is going to use to pass request status - information back to this unit - """ - return 'broker-rsp-' + local_unit().replace('/', '-') - - -def send_request_if_needed(request, relation='ceph'): - """Send broker request if an equivalent request has not already been sent - - @param request: A CephBrokerRq object - """ - if is_request_sent(request, relation=relation): - log('Request already sent but not complete, not sending new request', - level=DEBUG) - else: - for rid in relation_ids(relation): - log('Sending request {}'.format(request.request_id), level=DEBUG) - relation_set(relation_id=rid, broker_req=request.request) diff --git a/templates/ceph.conf b/templates/ceph.conf index 7fec00e5..66da0aca 100644 --- a/templates/ceph.conf +++ b/templates/ceph.conf @@ -33,8 +33,6 @@ cluster addr = {{ cluster_addr }} osd crush location = {{crush_location}} {% endif %} -[client.osd-upgrade] -keyring = /var/lib/ceph/osd/ceph.client.osd-upgrade.keyring [mon] keyring = /var/lib/ceph/mon/$cluster-$id/keyring diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index 7800a00d..49a10b11 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -43,8 +43,8 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): and the rest of the service are from lp branches that are compatible with the local charm (e.g. stable or next). """ - this_service = {'name': 'ceph-osd', 'units': 3} - other_services = [{'name': 'ceph-mon', 'units': 3}, + this_service = {'name': 'ceph-osd'} + other_services = [{'name': 'ceph', 'units': 3}, {'name': 'mysql'}, {'name': 'keystone'}, {'name': 'rabbitmq-server'}, @@ -60,18 +60,18 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): 'nova-compute:shared-db': 'mysql:shared-db', 'nova-compute:amqp': 'rabbitmq-server:amqp', 'nova-compute:image-service': 'glance:image-service', - 'nova-compute:ceph': 'ceph-mon:client', + 'nova-compute:ceph': 'ceph:client', 'keystone:shared-db': 'mysql:shared-db', 'glance:shared-db': 'mysql:shared-db', 'glance:identity-service': 'keystone:identity-service', 'glance:amqp': 'rabbitmq-server:amqp', - 'glance:ceph': 'ceph-mon:client', + 'glance:ceph': 'ceph:client', 'cinder:shared-db': 'mysql:shared-db', 'cinder:identity-service': 'keystone:identity-service', 'cinder:amqp': 'rabbitmq-server:amqp', 'cinder:image-service': 'glance:image-service', - 'cinder:ceph': 'ceph-mon:client', - 'ceph-osd:mon': 'ceph-mon:osd' + 'cinder:ceph': 'ceph:client', + 'ceph-osd:mon': 'ceph:osd' } super(CephOsdBasicDeployment, self)._add_relations(relations) @@ -86,6 +86,9 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): 'auth-supported': 'none', 'fsid': '6547bd3e-1397-11e2-82e5-53567c8d32dc', 'monitor-secret': 'AQCXrnZQwI7KGBAAiPofmKEXKxu5bUzoYLVkbQ==', + 'osd-reformat': 'yes', + 'ephemeral-unmount': '/mnt', + 'osd-devices': '/dev/vdb /srv/ceph' } # Include a non-existent device as osd-devices is a whitelist, @@ -99,7 +102,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): configs = {'keystone': keystone_config, 'mysql': mysql_config, 'cinder': cinder_config, - 'ceph-mon': ceph_config, + 'ceph': ceph_config, 'ceph-osd': ceph_osd_config} super(CephOsdBasicDeployment, self)._configure_services(configs) @@ -112,12 +115,10 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): self.nova_sentry = self.d.sentry.unit['nova-compute/0'] self.glance_sentry = self.d.sentry.unit['glance/0'] self.cinder_sentry = self.d.sentry.unit['cinder/0'] - self.ceph0_sentry = self.d.sentry.unit['ceph-mon/0'] - self.ceph1_sentry = self.d.sentry.unit['ceph-mon/1'] - self.ceph2_sentry = self.d.sentry.unit['ceph-mon/2'] + self.ceph0_sentry = self.d.sentry.unit['ceph/0'] + self.ceph1_sentry = self.d.sentry.unit['ceph/1'] + self.ceph2_sentry = self.d.sentry.unit['ceph/2'] self.ceph_osd_sentry = self.d.sentry.unit['ceph-osd/0'] - self.ceph_osd1_sentry = self.d.sentry.unit['ceph-osd/1'] - self.ceph_osd2_sentry = self.d.sentry.unit['ceph-osd/2'] u.log.debug('openstack release val: {}'.format( self._get_openstack_release())) u.log.debug('openstack release str: {}'.format( @@ -176,6 +177,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): # Process name and quantity of processes to expect on each unit ceph_processes = { 'ceph-mon': 1, + 'ceph-osd': 2 } # Units with process names and PID quantities expected @@ -212,6 +214,9 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): ceph_services = [ 'ceph-mon-all', 'ceph-mon id=`hostname`', + 'ceph-osd-all', + 'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(0)), + 'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(1)) ] services[self.ceph0_sentry] = ceph_services services[self.ceph1_sentry] = ceph_services @@ -228,16 +233,16 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment): def test_200_ceph_osd_ceph_relation(self): """Verify the ceph-osd to ceph relation data.""" - u.log.debug('Checking ceph-osd:ceph-mon relation data...') + u.log.debug('Checking ceph-osd:ceph mon relation data...') unit = self.ceph_osd_sentry - relation = ['mon', 'ceph-mon:osd'] + relation = ['mon', 'ceph:osd'] expected = { 'private-address': u.valid_ip } ret = u.validate_relation_data(unit, relation, expected) if ret: - message = u.relation_error('ceph-osd to ceph-mon', ret) + message = u.relation_error('ceph-osd to ceph', ret) amulet.raise_status(amulet.FAIL, msg=message) def test_201_ceph0_to_ceph_osd_relation(self): diff --git a/unit_tests/test_upgrade_roll.py b/unit_tests/test_upgrade_roll.py deleted file mode 100644 index 840e247c..00000000 --- a/unit_tests/test_upgrade_roll.py +++ /dev/null @@ -1,157 +0,0 @@ -import time - -__author__ = 'chris' -from mock import patch, call, MagicMock -import sys - -sys.path.append('/home/chris/repos/ceph-osd/hooks') - -from ceph import CrushLocation - -import test_utils -import ceph_hooks - -TO_PATCH = [ - 'apt_install', - 'apt_update', - 'add_source', - 'config', - 'ceph', - 'get_conf', - 'hookenv', - 'host', - 'log', - 'service_start', - 'service_stop', - 'socket', - 'status_set', -] - - -def config_side_effect(*args): - if args[0] == 'source': - return 'cloud:trusty-kilo' - elif args[0] == 'key': - return 'key' - elif args[0] == 'release-version': - return 'cloud:trusty-kilo' - - -previous_node_start_time = time.time() - (9 * 60) - - -def monitor_key_side_effect(*args): - if args[1] == \ - 'ip-192-168-1-2_done': - return False - elif args[1] == \ - 'ip-192-168-1-2_start': - # Return that the previous node started 9 minutes ago - return previous_node_start_time - - -class UpgradeRollingTestCase(test_utils.CharmTestCase): - def setUp(self): - super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH) - - @patch('ceph_hooks.roll_osd_cluster') - def test_check_for_upgrade(self, roll_osd_cluster): - self.host.lsb_release.return_value = { - 'DISTRIB_CODENAME': 'trusty', - } - previous_mock = MagicMock().return_value - previous_mock.previous.return_value = "cloud:trusty-juno" - self.hookenv.config.side_effect = [previous_mock, - config_side_effect('source')] - ceph_hooks.check_for_upgrade() - - roll_osd_cluster.assert_called_with('cloud:trusty-kilo') - - @patch('ceph_hooks.upgrade_osd') - @patch('ceph_hooks.monitor_key_set') - def test_lock_and_roll(self, monitor_key_set, upgrade_osd): - monitor_key_set.monitor_key_set.return_value = None - ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2') - upgrade_osd.assert_called_once_with() - - def test_upgrade_osd(self): - self.config.side_effect = config_side_effect - self.ceph.get_version.return_value = "0.80" - self.ceph.systemd.return_value = False - ceph_hooks.upgrade_osd() - self.service_stop.assert_called_with('ceph-osd-all') - self.service_start.assert_called_with('ceph-osd-all') - self.status_set.assert_has_calls([ - call('maintenance', 'Upgrading osd'), - ]) - - @patch('ceph_hooks.lock_and_roll') - @patch('ceph_hooks.get_upgrade_position') - def test_roll_osd_cluster_first(self, - get_upgrade_position, - lock_and_roll): - self.socket.gethostname.return_value = "ip-192-168-1-2" - self.ceph.get_osd_tree.return_value = "" - get_upgrade_position.return_value = 0 - ceph_hooks.roll_osd_cluster('0.94.1') - lock_and_roll.assert_called_with(my_name="ip-192-168-1-2") - - @patch('ceph_hooks.lock_and_roll') - @patch('ceph_hooks.get_upgrade_position') - @patch('ceph_hooks.wait_on_previous_node') - def test_roll_osd_cluster_second(self, - wait_on_previous_node, - get_upgrade_position, - lock_and_roll): - wait_on_previous_node.return_value = None - self.socket.gethostname.return_value = "ip-192-168-1-3" - self.ceph.get_osd_tree.return_value = [ - CrushLocation( - name="ip-192-168-1-2", - identifier='a', - host='host-a', - rack='rack-a', - row='row-a', - datacenter='dc-1', - chassis='chassis-a', - root='ceph'), - CrushLocation( - name="ip-192-168-1-3", - identifier='a', - host='host-b', - rack='rack-a', - row='row-a', - datacenter='dc-1', - chassis='chassis-a', - root='ceph') - ] - get_upgrade_position.return_value = 1 - ceph_hooks.roll_osd_cluster('0.94.1') - self.status_set.assert_called_with( - 'blocked', - 'Waiting on ip-192-168-1-2 to finish upgrading') - lock_and_roll.assert_called_with(my_name="ip-192-168-1-3") - - @patch('ceph_hooks.monitor_key_get') - @patch('ceph_hooks.monitor_key_exists') - def test_wait_on_previous_node(self, - monitor_key_exists, - monitor_key_get): - monitor_key_get.side_effect = monitor_key_side_effect - monitor_key_exists.return_value = False - - ceph_hooks.wait_on_previous_node("ip-192-168-1-2") - - # Make sure we checked to see if the previous node started - monitor_key_get.assert_has_calls( - [call('osd-upgrade', 'ip-192-168-1-2_start')] - ) - # Make sure we checked to see if the previous node was finished - monitor_key_exists.assert_has_calls( - [call('osd-upgrade', 'ip-192-168-1-2_done')] - ) - # Make sure we waited at last once before proceeding - self.log.assert_has_calls( - [call('Previous node is: ip-192-168-1-2')], - [call('ip-192-168-1-2 is not finished. Waiting')], - )