[hopem, r=chris.macnaughton]

charmhelpers sync
    

    add pre-config of lighter pools

   

    make lightweight pool pg_num configurable
This commit is contained in:
Liam Young 2016-01-22 15:38:47 +00:00
commit bff5923c60
20 changed files with 551 additions and 74 deletions

View File

@ -108,6 +108,27 @@ options:
the following public endpoint for the ceph-radosgw:
https://files.example.com:80/swift/v1
ceph-osd-replication-count:
type: int
default: 3
description: |
This value dictates the number of replicas ceph must make of any object
it stores within RGW pools. Note that once the RGW pools have been
created, changing this value will not have any effect (although it can be
changed in ceph by manually configuring your ceph cluster).
rgw-lightweight-pool-pg-num:
type: int
default: 64
description: |
When the Rados Gatway is installed it, by default, creates pools with
pg_num 8 which, in the majority of cases is suboptimal. A few rgw pools
tend to carry more data than others e.g. .rgw.buckets tends to be larger
than most. So, for pools with greater requirements than others the charm
will apply the optimal value i.e. corresponding to the number of OSDs
up+in the cluster at the time the pool is created. For others it will use
this value which can be altered depending on how big you cluster is. Note
that once a pool has been created, changes to this setting will be
ignored.
haproxy-server-timeout:
type: int
default:

View File

@ -14,6 +14,14 @@ import os
from socket import gethostname as get_unit_hostname
from charmhelpers.core.hookenv import (
config,
)
from charmhelpers.contrib.storage.linux.ceph import (
CephBrokerRq,
)
LEADER = 'leader'
PEON = 'peon'
QUORUM = [LEADER, PEON]
@ -219,3 +227,46 @@ def get_named_key(name, caps=None):
if 'key' in element:
key = element.split(' = ')[1].strip() # IGNORE:E1103
return key
def get_create_rgw_pools_rq():
"""Pre-create RGW pools so that they have the correct settings.
When RGW creates its own pools it will create them with non-optimal
settings (LP: #1476749).
NOTE: see http://docs.ceph.com/docs/master/radosgw/config-ref/#pools and
http://docs.ceph.com/docs/master/radosgw/config/#create-pools for
list of supported/required pools.
"""
rq = CephBrokerRq()
replicas = config('ceph-osd-replication-count')
# Buckets likely to contain the most data and therefore requiring the most
# PGs
heavy = ['.rgw.buckets']
for pool in heavy:
rq.add_op_create_pool(name=pool, replica_count=replicas)
# NOTE: we want these pools to have a smaller pg_num/pgp_num than the
# others since they are not expected to contain as much data
light = ['.rgw',
'.rgw.root',
'.rgw.control',
'.rgw.gc',
'.rgw.buckets',
'.rgw.buckets.index',
'.rgw.buckets.extra',
'.log',
'.intent-log'
'.usage',
'.users'
'.users.email'
'.users.swift'
'.users.uid']
pg_num = config('rgw-lightweight-pool-pg-num')
for pool in light:
rq.add_op_create_pool(name=pool, replica_count=replicas, pg_num=pg_num)
return rq

View File

@ -20,7 +20,7 @@ import sys
from six.moves import zip
from charmhelpers.core import unitdata
import charmhelpers.core.unitdata
class OutputFormatter(object):
@ -163,8 +163,8 @@ class CommandLine(object):
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if unitdata._KV:
unitdata._KV.flush()
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
cmdline = CommandLine()

View File

@ -14,12 +14,18 @@
# You should have received a copy of the GNU Lesser General Public License
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
import sys
import six
from collections import OrderedDict
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
DEBUG = logging.DEBUG
ERROR = logging.ERROR
class OpenStackAmuletDeployment(AmuletDeployment):
"""OpenStack amulet deployment.
@ -28,9 +34,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
that is specifically for use by OpenStack charms.
"""
def __init__(self, series=None, openstack=None, source=None, stable=True):
def __init__(self, series=None, openstack=None, source=None,
stable=True, log_level=DEBUG):
"""Initialize the deployment environment."""
super(OpenStackAmuletDeployment, self).__init__(series)
self.log = self.get_logger(level=log_level)
self.log.info('OpenStackAmuletDeployment: init')
self.openstack = openstack
self.source = source
self.stable = stable
@ -38,6 +47,22 @@ class OpenStackAmuletDeployment(AmuletDeployment):
# out.
self.current_next = "trusty"
def get_logger(self, name="deployment-logger", level=logging.DEBUG):
"""Get a logger object that will log to stdout."""
log = logging
logger = log.getLogger(name)
fmt = log.Formatter("%(asctime)s %(funcName)s "
"%(levelname)s: %(message)s")
handler = log.StreamHandler(stream=sys.stdout)
handler.setLevel(level)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(level)
return logger
def _determine_branch_locations(self, other_services):
"""Determine the branch locations for the other services.
@ -45,6 +70,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
stable or next (dev) branch, and based on this, use the corresonding
stable or next branches for the other_services."""
self.log.info('OpenStackAmuletDeployment: determine branch locations')
# Charms outside the lp:~openstack-charmers namespace
base_charms = ['mysql', 'mongodb', 'nrpe']
@ -82,6 +109,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _add_services(self, this_service, other_services):
"""Add services to the deployment and set openstack-origin/source."""
self.log.info('OpenStackAmuletDeployment: adding services')
other_services = self._determine_branch_locations(other_services)
super(OpenStackAmuletDeployment, self)._add_services(this_service,
@ -95,7 +124,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
'ceph-osd', 'ceph-radosgw']
# Charms which can not use openstack-origin, ie. many subordinates
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
'openvswitch-odl', 'neutron-api-odl', 'odl-controller']
if self.openstack:
for svc in services:
@ -111,9 +141,79 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _configure_services(self, configs):
"""Configure all of the services."""
self.log.info('OpenStackAmuletDeployment: configure services')
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _auto_wait_for_status(self, message=None, exclude_services=None,
include_only=None, timeout=1800):
"""Wait for all units to have a specific extended status, except
for any defined as excluded. Unless specified via message, any
status containing any case of 'ready' will be considered a match.
Examples of message usage:
Wait for all unit status to CONTAIN any case of 'ready' or 'ok':
message = re.compile('.*ready.*|.*ok.*', re.IGNORECASE)
Wait for all units to reach this status (exact match):
message = re.compile('^Unit is ready and clustered$')
Wait for all units to reach any one of these (exact match):
message = re.compile('Unit is ready|OK|Ready')
Wait for at least one unit to reach this status (exact match):
message = {'ready'}
See Amulet's sentry.wait_for_messages() for message usage detail.
https://github.com/juju/amulet/blob/master/amulet/sentry.py
:param message: Expected status match
:param exclude_services: List of juju service names to ignore,
not to be used in conjuction with include_only.
:param include_only: List of juju service names to exclusively check,
not to be used in conjuction with exclude_services.
:param timeout: Maximum time in seconds to wait for status match
:returns: None. Raises if timeout is hit.
"""
self.log.info('Waiting for extended status on units...')
all_services = self.d.services.keys()
if exclude_services and include_only:
raise ValueError('exclude_services can not be used '
'with include_only')
if message:
if isinstance(message, re._pattern_type):
match = message.pattern
else:
match = message
self.log.debug('Custom extended status wait match: '
'{}'.format(match))
else:
self.log.debug('Default extended status wait match: contains '
'READY (case-insensitive)')
message = re.compile('.*ready.*', re.IGNORECASE)
if exclude_services:
self.log.debug('Excluding services from extended status match: '
'{}'.format(exclude_services))
else:
exclude_services = []
if include_only:
services = include_only
else:
services = list(set(all_services) - set(exclude_services))
self.log.debug('Waiting up to {}s for extended status on services: '
'{}'.format(timeout, services))
service_messages = {service: message for service in services}
self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
self.log.info('OK')
def _get_openstack_release(self):
"""Get openstack release.

View File

@ -18,6 +18,7 @@ import amulet
import json
import logging
import os
import re
import six
import time
import urllib
@ -604,7 +605,22 @@ class OpenStackAmuletUtils(AmuletUtils):
'{}'.format(sample_type, samples))
return None
# rabbitmq/amqp specific helpers:
# rabbitmq/amqp specific helpers:
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
"""Wait for rmq units extended status to show cluster readiness,
after an optional initial sleep period. Initial sleep is likely
necessary to be effective following a config change, as status
message may not instantly update to non-ready."""
if init_sleep:
time.sleep(init_sleep)
message = re.compile('^Unit is ready and clustered$')
deployment._auto_wait_for_status(message=message,
timeout=timeout,
include_only=['rabbitmq-server'])
def add_rmq_test_user(self, sentry_units,
username="testuser1", password="changeme"):
"""Add a test user via the first rmq juju unit, check connection as
@ -805,7 +821,10 @@ class OpenStackAmuletUtils(AmuletUtils):
if port:
config['ssl_port'] = port
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0
@ -832,7 +851,10 @@ class OpenStackAmuletUtils(AmuletUtils):
# Disable RMQ SSL
config = {'ssl': 'off'}
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0

View File

@ -958,6 +958,19 @@ class NeutronContext(OSContextGenerator):
'config': config}
return ovs_ctxt
def midonet_ctxt(self):
driver = neutron_plugin_attribute(self.plugin, 'driver',
self.network_manager)
midonet_config = neutron_plugin_attribute(self.plugin, 'config',
self.network_manager)
mido_ctxt = {'core_plugin': driver,
'neutron_plugin': 'midonet',
'neutron_security_groups': self.neutron_security_groups,
'local_ip': unit_private_ip(),
'config': midonet_config}
return mido_ctxt
def __call__(self):
if self.network_manager not in ['quantum', 'neutron']:
return {}
@ -979,6 +992,8 @@ class NeutronContext(OSContextGenerator):
ctxt.update(self.nuage_ctxt())
elif self.plugin == 'plumgrid':
ctxt.update(self.pg_ctxt())
elif self.plugin == 'midonet':
ctxt.update(self.midonet_ctxt())
alchemy_flags = config('neutron-alchemy-flags')
if alchemy_flags:
@ -1111,7 +1126,7 @@ class SubordinateConfigContext(OSContextGenerator):
ctxt = {
... other context ...
'subordinate_config': {
'subordinate_configuration': {
'DEFAULT': {
'key1': 'value1',
},
@ -1152,22 +1167,23 @@ class SubordinateConfigContext(OSContextGenerator):
try:
sub_config = json.loads(sub_config)
except:
log('Could not parse JSON from subordinate_config '
'setting from %s' % rid, level=ERROR)
log('Could not parse JSON from '
'subordinate_configuration setting from %s'
% rid, level=ERROR)
continue
for service in self.services:
if service not in sub_config:
log('Found subordinate_config on %s but it contained'
'nothing for %s service' % (rid, service),
level=INFO)
log('Found subordinate_configuration on %s but it '
'contained nothing for %s service'
% (rid, service), level=INFO)
continue
sub_config = sub_config[service]
if self.config_file not in sub_config:
log('Found subordinate_config on %s but it contained'
'nothing for %s' % (rid, self.config_file),
level=INFO)
log('Found subordinate_configuration on %s but it '
'contained nothing for %s'
% (rid, self.config_file), level=INFO)
continue
sub_config = sub_config[self.config_file]

View File

@ -204,11 +204,25 @@ def neutron_plugins():
database=config('database'),
ssl_dir=NEUTRON_CONF_DIR)],
'services': [],
'packages': [['plumgrid-lxc'],
['iovisor-dkms']],
'packages': ['plumgrid-lxc',
'iovisor-dkms'],
'server_packages': ['neutron-server',
'neutron-plugin-plumgrid'],
'server_services': ['neutron-server']
},
'midonet': {
'config': '/etc/neutron/plugins/midonet/midonet.ini',
'driver': 'midonet.neutron.plugin.MidonetPluginV2',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'services': [],
'packages': [[headers_package()] + determine_dkms_package()],
'server_packages': ['neutron-server',
'python-neutron-plugin-midonet'],
'server_services': ['neutron-server']
}
}
if release >= 'icehouse':

View File

@ -26,6 +26,7 @@ import re
import six
import traceback
import uuid
import yaml
from charmhelpers.contrib.network import ip
@ -41,6 +42,7 @@ from charmhelpers.core.hookenv import (
log as juju_log,
charm_dir,
INFO,
related_units,
relation_ids,
relation_set,
status_set,
@ -121,6 +123,7 @@ SWIFT_CODENAMES = OrderedDict([
('2.2.2', 'kilo'),
('2.3.0', 'liberty'),
('2.4.0', 'liberty'),
('2.5.0', 'liberty'),
])
# >= Liberty version->codename mapping
@ -858,7 +861,9 @@ def set_os_workload_status(configs, required_interfaces, charm_func=None):
if charm_state != 'active' and charm_state != 'unknown':
state = workload_state_compare(state, charm_state)
if message:
message = "{} {}".format(message, charm_message)
charm_message = charm_message.replace("Incomplete relations: ",
"")
message = "{}, {}".format(message, charm_message)
else:
message = charm_message
@ -975,3 +980,19 @@ def do_action_openstack_upgrade(package, upgrade_callback, configs):
action_set({'outcome': 'no upgrade available.'})
return ret
def remote_restart(rel_name, remote_service=None):
trigger = {
'restart-trigger': str(uuid.uuid4()),
}
if remote_service:
trigger['remote-service'] = remote_service
for rid in relation_ids(rel_name):
# This subordinate can be related to two seperate services using
# different subordinate relations so only issue the restart if
# the principle is conencted down the relation we think it is
if related_units(relid=rid):
relation_set(relation_id=rid,
relation_settings=trigger,
)

View File

@ -26,6 +26,7 @@
import os
import shutil
import six
import json
import time
import uuid
@ -125,29 +126,37 @@ def get_osds(service):
return None
def create_pool(service, name, replicas=3):
def update_pool(client, pool, settings):
cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
for k, v in six.iteritems(settings):
cmd.append(k)
cmd.append(v)
check_call(cmd)
def create_pool(service, name, replicas=3, pg_num=None):
"""Create a new RADOS pool."""
if pool_exists(service, name):
log("Ceph pool {} already exists, skipping creation".format(name),
level=WARNING)
return
# Calculate the number of placement groups based
# on upstream recommended best practices.
osds = get_osds(service)
if osds:
pgnum = (len(osds) * 100 // replicas)
else:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
pgnum = 200
if not pg_num:
# Calculate the number of placement groups based
# on upstream recommended best practices.
osds = get_osds(service)
if osds:
pg_num = (len(osds) * 100 // replicas)
else:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
pg_num = 200
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
check_call(cmd)
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
str(replicas)]
check_call(cmd)
update_pool(service, name, settings={'size': str(replicas)})
def delete_pool(service, name):
@ -202,10 +211,10 @@ def create_key_file(service, key):
log('Created new keyfile at %s.' % keyfile, level=INFO)
def get_ceph_nodes():
"""Query named relation 'ceph' to determine current nodes."""
def get_ceph_nodes(relation='ceph'):
"""Query named relation to determine current nodes."""
hosts = []
for r_id in relation_ids('ceph'):
for r_id in relation_ids(relation):
for unit in related_units(r_id):
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
@ -357,14 +366,14 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
service_start(svc)
def ensure_ceph_keyring(service, user=None, group=None):
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
Returns False if no ceph key is available in relation state.
"""
key = None
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
@ -413,9 +422,16 @@ class CephBrokerRq(object):
self.request_id = str(uuid.uuid1())
self.ops = []
def add_op_create_pool(self, name, replica_count=3):
def add_op_create_pool(self, name, replica_count=3, pg_num=None):
"""Adds an operation to create a pool.
@param pg_num setting: optional setting. If not provided, this value
will be calculated by the broker based on how many OSDs are in the
cluster at the time of creation. Note that, if provided, this value
will be capped at the current available maximum.
"""
self.ops.append({'op': 'create-pool', 'name': name,
'replicas': replica_count})
'replicas': replica_count, 'pg_num': pg_num})
def set_ops(self, ops):
"""Set request ops to provided value.
@ -433,8 +449,8 @@ class CephBrokerRq(object):
def _ops_equal(self, other):
if len(self.ops) == len(other.ops):
for req_no in range(0, len(self.ops)):
for key in ['replicas', 'name', 'op']:
if self.ops[req_no][key] != other.ops[req_no][key]:
for key in ['replicas', 'name', 'op', 'pg_num']:
if self.ops[req_no].get(key) != other.ops[req_no].get(key):
return False
else:
return False
@ -540,7 +556,7 @@ def get_previous_request(rid):
return request
def get_request_states(request):
def get_request_states(request, relation='ceph'):
"""Return a dict of requests per relation id with their corresponding
completion state.
@ -552,7 +568,7 @@ def get_request_states(request):
"""
complete = []
requests = {}
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
complete = False
previous_request = get_previous_request(rid)
if request == previous_request:
@ -570,14 +586,14 @@ def get_request_states(request):
return requests
def is_request_sent(request):
def is_request_sent(request, relation='ceph'):
"""Check to see if a functionally equivalent request has already been sent
Returns True if a similair request has been sent
@param request: A CephBrokerRq object
"""
states = get_request_states(request)
states = get_request_states(request, relation=relation)
for rid in states.keys():
if not states[rid]['sent']:
return False
@ -585,7 +601,7 @@ def is_request_sent(request):
return True
def is_request_complete(request):
def is_request_complete(request, relation='ceph'):
"""Check to see if a functionally equivalent request has already been
completed
@ -593,7 +609,7 @@ def is_request_complete(request):
@param request: A CephBrokerRq object
"""
states = get_request_states(request)
states = get_request_states(request, relation=relation)
for rid in states.keys():
if not states[rid]['complete']:
return False
@ -643,15 +659,15 @@ def get_broker_rsp_key():
return 'broker-rsp-' + local_unit().replace('/', '-')
def send_request_if_needed(request):
def send_request_if_needed(request, relation='ceph'):
"""Send broker request if an equivalent request has not already been sent
@param request: A CephBrokerRq object
"""
if is_request_sent(request):
if is_request_sent(request, relation=relation):
log('Request already sent but not complete, not sending new request',
level=DEBUG)
else:
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
log('Sending request {}'.format(request.request_id), level=DEBUG)
relation_set(relation_id=rid, broker_req=request.request)

View File

@ -76,3 +76,13 @@ def ensure_loopback_device(path, size):
check_call(cmd)
return create_loopback(path)
def is_mapped_loopback_device(device):
"""
Checks if a given device name is an existing/mapped loopback device.
:param device: str: Full path to the device (eg, /dev/loop1).
:returns: str: Path to the backing file if is a loopback device
empty string otherwise
"""
return loopback_devices().get(device, "")

View File

@ -490,6 +490,19 @@ def relation_types():
return rel_types
@cached
def peer_relation_id():
'''Get a peer relation id if a peer relation has been joined, else None.'''
md = metadata()
section = md.get('peers')
if section:
for key in section:
relids = relation_ids(key)
if relids:
return relids[0]
return None
@cached
def relation_to_interface(relation_name):
"""
@ -820,6 +833,7 @@ def status_get():
def translate_exc(from_exc, to_exc):
def inner_translate_exc1(f):
@wraps(f)
def inner_translate_exc2(*args, **kwargs):
try:
return f(*args, **kwargs)

View File

@ -67,7 +67,9 @@ def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
"""Pause a system service.
Stop it, and prevent it from starting again at boot."""
stopped = service_stop(service_name)
stopped = True
if service_running(service_name):
stopped = service_stop(service_name)
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
if os.path.exists(upstart_file):
@ -105,7 +107,9 @@ def service_resume(service_name, init_dir="/etc/init",
"Unable to detect {0} as either Upstart {1} or SysV {2}".format(
service_name, upstart_file, sysv_file))
started = service_start(service_name)
started = service_running(service_name)
if not started:
started = service_start(service_name)
return started
@ -566,7 +570,14 @@ def chdir(d):
os.chdir(cur)
def chownr(path, owner, group, follow_links=True):
def chownr(path, owner, group, follow_links=True, chowntopdir=False):
"""
Recursively change user and group ownership of files and directories
in given path. Doesn't chown path itself by default, only its children.
:param bool follow_links: Also Chown links if True
:param bool chowntopdir: Also chown path itself if True
"""
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
if follow_links:
@ -574,6 +585,10 @@ def chownr(path, owner, group, follow_links=True):
else:
chown = os.lchown
if chowntopdir:
broken_symlink = os.path.lexists(path) and not os.path.exists(path)
if not broken_symlink:
chown(path, uid, gid)
for root, dirs, files in os.walk(path):
for name in dirs + files:
full = os.path.join(root, name)
@ -584,3 +599,19 @@ def chownr(path, owner, group, follow_links=True):
def lchownr(path, owner, group):
chownr(path, owner, group, follow_links=False)
def get_total_ram():
'''The total amount of system RAM in bytes.
This is what is reported by the OS, and may be overcommitted when
there are multiple containers hosted on the same machine.
'''
with open('/proc/meminfo', 'r') as f:
for line in f.readlines():
if line:
key, value, unit = line.split()
if key == 'MemTotal:':
assert unit == 'kB', 'Unknown unit'
return int(value) * 1024 # Classic, not KiB.
raise NotImplementedError()

View File

@ -46,6 +46,8 @@ def hugepage_support(user, group='hugetlb', nr_hugepages=256,
group_info = add_group(group)
gid = group_info.gr_gid
add_user_to_group(user, group)
if max_map_count < 2 * nr_hugepages:
max_map_count = 2 * nr_hugepages
sysctl_settings = {
'vm.nr_hugepages': nr_hugepages,
'vm.max_map_count': max_map_count,

View File

@ -249,16 +249,18 @@ class TemplateCallback(ManagerCallback):
:param int perms: The permissions of the rendered file
:param partial on_change_action: functools partial to be executed when
rendered file changes
:param jinja2 loader template_loader: A jinja2 template loader
"""
def __init__(self, source, target,
owner='root', group='root', perms=0o444,
on_change_action=None):
on_change_action=None, template_loader=None):
self.source = source
self.target = target
self.owner = owner
self.group = group
self.perms = perms
self.on_change_action = on_change_action
self.template_loader = template_loader
def __call__(self, manager, service_name, event_name):
pre_checksum = ''
@ -269,7 +271,8 @@ class TemplateCallback(ManagerCallback):
for ctx in service.get('required_data', []):
context.update(ctx)
templating.render(self.source, self.target, context,
self.owner, self.group, self.perms)
self.owner, self.group, self.perms,
template_loader=self.template_loader)
if self.on_change_action:
if pre_checksum == host.file_hash(self.target):
hookenv.log(

View File

@ -21,7 +21,7 @@ from charmhelpers.core import hookenv
def render(source, target, context, owner='root', group='root',
perms=0o444, templates_dir=None, encoding='UTF-8'):
perms=0o444, templates_dir=None, encoding='UTF-8', template_loader=None):
"""
Render a template.
@ -52,17 +52,24 @@ def render(source, target, context, owner='root', group='root',
apt_install('python-jinja2', fatal=True)
from jinja2 import FileSystemLoader, Environment, exceptions
if templates_dir is None:
templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
loader = Environment(loader=FileSystemLoader(templates_dir))
if template_loader:
template_env = Environment(loader=template_loader)
else:
if templates_dir is None:
templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
template_env = Environment(loader=FileSystemLoader(templates_dir))
try:
source = source
template = loader.get_template(source)
template = template_env.get_template(source)
except exceptions.TemplateNotFound as e:
hookenv.log('Could not load template %s from %s.' %
(source, templates_dir),
level=hookenv.ERROR)
raise e
content = template.render(context)
host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
target_dir = os.path.dirname(target)
if not os.path.exists(target_dir):
# This is a terrible default directory permission, as the file
# or its siblings will often contain secrets.
host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
host.write_file(target, content.encode(encoding), owner, group, perms)

View File

@ -225,12 +225,12 @@ def apt_purge(packages, fatal=False):
def apt_mark(packages, mark, fatal=False):
"""Flag one or more packages using apt-mark"""
log("Marking {} as {}".format(packages, mark))
cmd = ['apt-mark', mark]
if isinstance(packages, six.string_types):
cmd.append(packages)
else:
cmd.extend(packages)
log("Holding {}".format(packages))
if fatal:
subprocess.check_call(cmd, universal_newlines=True)

View File

@ -68,6 +68,10 @@ from charmhelpers.contrib.openstack.ip import (
from charmhelpers.contrib.openstack.utils import (
set_os_workload_status,
)
from charmhelpers.contrib.storage.linux.ceph import (
send_request_if_needed,
is_request_complete,
)
APACHE_PORTS_CONF = '/etc/apache2/ports.conf'
@ -297,11 +301,16 @@ def config_changed():
'mon-relation-changed')
@restart_on_change({'/etc/ceph/ceph.conf': ['radosgw']})
def mon_relation():
CONFIGS.write_all()
key = relation_get('radosgw_key')
if key:
ceph.import_radosgw_key(key)
restart() # TODO figure out a better way todo this
rq = ceph.get_create_rgw_pools_rq()
if is_request_complete(rq, relation='mon'):
log('Broker request complete', level=DEBUG)
CONFIGS.write_all()
key = relation_get('radosgw_key')
if key:
ceph.import_radosgw_key(key)
restart() # TODO figure out a better way todo this
else:
send_request_if_needed(rq, relation='mon')
@hooks.hook('gateway-relation-joined')

View File

@ -14,12 +14,18 @@
# You should have received a copy of the GNU Lesser General Public License
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
import sys
import six
from collections import OrderedDict
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
DEBUG = logging.DEBUG
ERROR = logging.ERROR
class OpenStackAmuletDeployment(AmuletDeployment):
"""OpenStack amulet deployment.
@ -28,9 +34,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
that is specifically for use by OpenStack charms.
"""
def __init__(self, series=None, openstack=None, source=None, stable=True):
def __init__(self, series=None, openstack=None, source=None,
stable=True, log_level=DEBUG):
"""Initialize the deployment environment."""
super(OpenStackAmuletDeployment, self).__init__(series)
self.log = self.get_logger(level=log_level)
self.log.info('OpenStackAmuletDeployment: init')
self.openstack = openstack
self.source = source
self.stable = stable
@ -38,6 +47,22 @@ class OpenStackAmuletDeployment(AmuletDeployment):
# out.
self.current_next = "trusty"
def get_logger(self, name="deployment-logger", level=logging.DEBUG):
"""Get a logger object that will log to stdout."""
log = logging
logger = log.getLogger(name)
fmt = log.Formatter("%(asctime)s %(funcName)s "
"%(levelname)s: %(message)s")
handler = log.StreamHandler(stream=sys.stdout)
handler.setLevel(level)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(level)
return logger
def _determine_branch_locations(self, other_services):
"""Determine the branch locations for the other services.
@ -45,6 +70,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
stable or next (dev) branch, and based on this, use the corresonding
stable or next branches for the other_services."""
self.log.info('OpenStackAmuletDeployment: determine branch locations')
# Charms outside the lp:~openstack-charmers namespace
base_charms = ['mysql', 'mongodb', 'nrpe']
@ -82,6 +109,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _add_services(self, this_service, other_services):
"""Add services to the deployment and set openstack-origin/source."""
self.log.info('OpenStackAmuletDeployment: adding services')
other_services = self._determine_branch_locations(other_services)
super(OpenStackAmuletDeployment, self)._add_services(this_service,
@ -95,7 +124,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
'ceph-osd', 'ceph-radosgw']
# Charms which can not use openstack-origin, ie. many subordinates
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
'openvswitch-odl', 'neutron-api-odl', 'odl-controller']
if self.openstack:
for svc in services:
@ -111,9 +141,79 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _configure_services(self, configs):
"""Configure all of the services."""
self.log.info('OpenStackAmuletDeployment: configure services')
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _auto_wait_for_status(self, message=None, exclude_services=None,
include_only=None, timeout=1800):
"""Wait for all units to have a specific extended status, except
for any defined as excluded. Unless specified via message, any
status containing any case of 'ready' will be considered a match.
Examples of message usage:
Wait for all unit status to CONTAIN any case of 'ready' or 'ok':
message = re.compile('.*ready.*|.*ok.*', re.IGNORECASE)
Wait for all units to reach this status (exact match):
message = re.compile('^Unit is ready and clustered$')
Wait for all units to reach any one of these (exact match):
message = re.compile('Unit is ready|OK|Ready')
Wait for at least one unit to reach this status (exact match):
message = {'ready'}
See Amulet's sentry.wait_for_messages() for message usage detail.
https://github.com/juju/amulet/blob/master/amulet/sentry.py
:param message: Expected status match
:param exclude_services: List of juju service names to ignore,
not to be used in conjuction with include_only.
:param include_only: List of juju service names to exclusively check,
not to be used in conjuction with exclude_services.
:param timeout: Maximum time in seconds to wait for status match
:returns: None. Raises if timeout is hit.
"""
self.log.info('Waiting for extended status on units...')
all_services = self.d.services.keys()
if exclude_services and include_only:
raise ValueError('exclude_services can not be used '
'with include_only')
if message:
if isinstance(message, re._pattern_type):
match = message.pattern
else:
match = message
self.log.debug('Custom extended status wait match: '
'{}'.format(match))
else:
self.log.debug('Default extended status wait match: contains '
'READY (case-insensitive)')
message = re.compile('.*ready.*', re.IGNORECASE)
if exclude_services:
self.log.debug('Excluding services from extended status match: '
'{}'.format(exclude_services))
else:
exclude_services = []
if include_only:
services = include_only
else:
services = list(set(all_services) - set(exclude_services))
self.log.debug('Waiting up to {}s for extended status on services: '
'{}'.format(timeout, services))
service_messages = {service: message for service in services}
self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
self.log.info('OK')
def _get_openstack_release(self):
"""Get openstack release.

View File

@ -18,6 +18,7 @@ import amulet
import json
import logging
import os
import re
import six
import time
import urllib
@ -604,7 +605,22 @@ class OpenStackAmuletUtils(AmuletUtils):
'{}'.format(sample_type, samples))
return None
# rabbitmq/amqp specific helpers:
# rabbitmq/amqp specific helpers:
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
"""Wait for rmq units extended status to show cluster readiness,
after an optional initial sleep period. Initial sleep is likely
necessary to be effective following a config change, as status
message may not instantly update to non-ready."""
if init_sleep:
time.sleep(init_sleep)
message = re.compile('^Unit is ready and clustered$')
deployment._auto_wait_for_status(message=message,
timeout=timeout,
include_only=['rabbitmq-server'])
def add_rmq_test_user(self, sentry_units,
username="testuser1", password="changeme"):
"""Add a test user via the first rmq juju unit, check connection as
@ -805,7 +821,10 @@ class OpenStackAmuletUtils(AmuletUtils):
if port:
config['ssl_port'] = port
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0
@ -832,7 +851,10 @@ class OpenStackAmuletUtils(AmuletUtils):
# Disable RMQ SSL
config = {'ssl': 'off'}
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0

View File

@ -186,6 +186,8 @@ class CephRadosGWTests(CharmTestCase):
self.assertTrue(_apache_modules.called)
self.assertTrue(_apache_reload.called)
@patch.object(ceph_hooks, 'is_request_complete',
lambda *args, **kwargs: True)
def test_mon_relation(self):
_ceph = self.patch('ceph')
_restart = self.patch('restart')
@ -195,6 +197,8 @@ class CephRadosGWTests(CharmTestCase):
_ceph.import_radosgw_key.assert_called_with('seckey')
self.CONFIGS.write_all.assert_called_with()
@patch.object(ceph_hooks, 'is_request_complete',
lambda *args, **kwargs: True)
def test_mon_relation_nokey(self):
_ceph = self.patch('ceph')
_restart = self.patch('restart')
@ -204,6 +208,20 @@ class CephRadosGWTests(CharmTestCase):
self.assertFalse(_restart.called)
self.CONFIGS.write_all.assert_called_with()
@patch.object(ceph_hooks, 'send_request_if_needed')
@patch.object(ceph_hooks, 'is_request_complete',
lambda *args, **kwargs: False)
def test_mon_relation_send_broker_request(self,
mock_send_request_if_needed):
_ceph = self.patch('ceph')
_restart = self.patch('restart')
self.relation_get.return_value = 'seckey'
ceph_hooks.mon_relation()
self.assertFalse(_restart.called)
self.assertFalse(_ceph.import_radosgw_key.called)
self.assertFalse(self.CONFIGS.called)
self.assertTrue(mock_send_request_if_needed.called)
def test_gateway_relation(self):
self.unit_get.return_value = 'myserver'
ceph_hooks.gateway_relation()