Add security-checklist action

Change-Id: Ie1f0dcc85c708d2e837d09c7472a35d5ffa7fd13
This commit is contained in:
Chris MacNaughton 2019-02-28 13:42:30 +01:00 committed by Chris MacNaughton
parent 733b901467
commit 35afe120af
9 changed files with 537 additions and 70 deletions

View File

@ -11,3 +11,5 @@ resume:
Resume glance services. Resume glance services.
If the glance deployment is clustered using the hacluster charm, the If the glance deployment is clustered using the hacluster charm, the
corresponding hacluster unit on the node must be resumed as well. corresponding hacluster unit on the node must be resumed as well.
security-checklist:
description: Validate the running configuration against the OpenStack security guides checklist

1
actions/security-checklist Symbolic link
View File

@ -0,0 +1 @@
security_checklist.py

124
actions/security_checklist.py Executable file
View File

@ -0,0 +1,124 @@
#!/usr/bin/env python3
#
# Copyright 2019 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import json
import os
import sys
sys.path.append('.')
import charmhelpers.contrib.openstack.audits as audits
from charmhelpers.contrib.openstack.audits import (
openstack_security_guide,
)
# Via the openstack_security_guide above, we are running the following
# security assertions automatically:
#
# - Check-Image-01 validate-file-ownership
# - Check-Image-02 validate-file-permissions
@audits.audit(audits.is_audit_type(audits.AuditType.OpenStackSecurityGuide),
audits.before_openstack_release('glance-common', 'rocky'))
def prevent_masked_port_scans(audit_options):
"""Validate that masked port scans are disabled.
Security Guide Check Name: Check-Image-05
:param audit_options: Dictionary of options for audit configuration
:type audit_options: Dict
:raises: AssertionError if the assertion fails.
"""
try:
with open('/etc/glance/policy.json') as f:
policy = json.loads(f.read())
except json.decoder.JSONDecodeError:
assert False, "policy.json is invalid JSON"
assert policy is not None, "policy.json should restrict copy_from"
assert policy.get('copy_from') is not None, \
"policy.json should restrict copy_from"
@audits.audit(audits.is_audit_type(audits.AuditType.OpenStackSecurityGuide))
def validate_glance_uses_keystone(audit_options):
"""Validate that the service uses Keystone for authentication.
Security Guide Check Name: Check-Image-03
:param audit_options: Dictionary of options for audit configuration
:type audit_options: Dict
:raises: AssertionError if the assertion fails.
"""
conf = configparser.ConfigParser()
conf.read(os.path.join('/etc/glance/glance-api.conf'))
glance_api = dict(conf)
assert glance_api.get('DEFAULT', {}).get('auth_strategy') == "keystone", \
"Keystone should be used for auth in glance-api.conf"
conf = configparser.ConfigParser()
conf.read(os.path.join('/etc/glance/glance-registry.conf'))
glance_registry = dict(conf)
assert glance_registry.get('DEFAULT', {}) \
.get('auth_strategy') == "keystone", \
"Keystone should be used for auth in glance-api.conf"
@audits.audit(audits.is_audit_type(audits.AuditType.OpenStackSecurityGuide))
def validate_glance_uses_tls_for_keystone(audit_options):
"""Verify that TLS is used to communicate with Keystone.
Security Guide Check Name: Check-Image-04
:param audit_options: Dictionary of options for audit configuration
:type audit_options: Dict
:raises: AssertionError if the assertion fails.
"""
conf = configparser.ConfigParser()
conf.read(os.path.join('/etc/glance/glance-api.conf'))
glance_api = dict(conf)
assert not glance_api.get('keystone_authtoken', {}).get('insecure'), \
"Insecure mode should not be used with TLS"
assert glance_api.get('keystone_authtoken', {}).get('auth_uri'). \
startswith("https://"), \
"TLS should be used to authenticate with Keystone"
conf = configparser.ConfigParser()
conf.read(os.path.join('/etc/glance/glance-registry.conf'))
glance_registry = dict(conf)
assert not glance_registry.get('keystone_authtoken', {}).get('insecure'), \
"Insecure mode should not be used with TLS"
assert glance_registry.get('keystone_authtoken', {}).get('auth_uri'). \
startswith("https://"), \
"TLS should be used to authenticate with Keystone"
def main():
config = {
'config_path': '/etc/glance',
'config_file': 'glance-api.conf',
'audit_type': audits.AuditType.OpenStackSecurityGuide,
'files': openstack_security_guide.FILE_ASSERTIONS['glance'],
'excludes': [
'validate-uses-tls-for-glance',
'validate-uses-keystone',
'validate-uses-tls-for-keystone',
],
}
return audits.action_parse_results(audits.run(config))
if __name__ == "__main__":
sys.exit(main())

View File

@ -19,7 +19,7 @@ from enum import Enum
import traceback import traceback
from charmhelpers.core.host import cmp_pkgrevno from charmhelpers.core.host import cmp_pkgrevno
import charmhelpers.contrib.openstack.utils as openstack_utils
import charmhelpers.core.hookenv as hookenv import charmhelpers.core.hookenv as hookenv
@ -39,7 +39,7 @@ def audit(*args):
deployed system that matches the given configuration deployed system that matches the given configuration
:param args: List of functions to filter tests against :param args: List of functions to filter tests against
:type args: List[Callable(Config)] :type args: List[Callable[Dict]]
""" """
def wrapper(f): def wrapper(f):
test_name = f.__name__ test_name = f.__name__
@ -58,28 +58,92 @@ def audit(*args):
def is_audit_type(*args): def is_audit_type(*args):
"""This audit is included in the specified kinds of audits.""" """This audit is included in the specified kinds of audits.
def should_run(audit_options):
:param *args: List of AuditTypes to include this audit in
:type args: List[AuditType]
:rtype: Callable[Dict]
"""
def _is_audit_type(audit_options):
if audit_options.get('audit_type') in args: if audit_options.get('audit_type') in args:
return True return True
else: else:
return False return False
return should_run return _is_audit_type
def since_package(pkg, pkg_version): def since_package(pkg, pkg_version):
"""This audit should be run after the specified package version (incl).""" """This audit should be run after the specified package version (incl).
return lambda audit_options=None: cmp_pkgrevno(pkg, pkg_version) >= 0
:param pkg: Package name to compare
:type pkg: str
:param release: The package version
:type release: str
:rtype: Callable[Dict]
"""
def _since_package(audit_options=None):
return cmp_pkgrevno(pkg, pkg_version) >= 0
return _since_package
def before_package(pkg, pkg_version): def before_package(pkg, pkg_version):
"""This audit should be run before the specified package version (excl).""" """This audit should be run before the specified package version (excl).
return lambda audit_options=None: not since_package(pkg, pkg_version)()
:param pkg: Package name to compare
:type pkg: str
:param release: The package version
:type release: str
:rtype: Callable[Dict]
"""
def _before_package(audit_options=None):
return not since_package(pkg, pkg_version)()
return _before_package
def since_openstack_release(pkg, release):
"""This audit should run after the specified OpenStack version (incl).
:param pkg: Package name to compare
:type pkg: str
:param release: The OpenStack release codename
:type release: str
:rtype: Callable[Dict]
"""
def _since_openstack_release(audit_options=None):
_release = openstack_utils.get_os_codename_package(pkg)
return openstack_utils.CompareOpenStackReleases(_release) >= release
return _since_openstack_release
def before_openstack_release(pkg, release):
"""This audit should run before the specified OpenStack version (excl).
:param pkg: Package name to compare
:type pkg: str
:param release: The OpenStack release codename
:type release: str
:rtype: Callable[Dict]
"""
def _before_openstack_release(audit_options=None):
return not since_openstack_release(pkg, release)()
return _before_openstack_release
def it_has_config(config_key): def it_has_config(config_key):
"""This audit should be run based on specified config keys.""" """This audit should be run based on specified config keys.
return lambda audit_options: audit_options.get(config_key) is not None
:param config_key: Config key to look for
:type config_key: str
:rtype: Callable[Dict]
"""
def _it_has_config(audit_options):
return audit_options.get(config_key) is not None
return _it_has_config
def run(audit_options): def run(audit_options):
@ -87,11 +151,19 @@ def run(audit_options):
:param audit_options: Configuration for the audit :param audit_options: Configuration for the audit
:type audit_options: Config :type audit_options: Config
:rtype: Dict[str, str]
""" """
errors = {} errors = {}
results = {} results = {}
for name, audit in sorted(_audits.items()): for name, audit in sorted(_audits.items()):
result_name = name.replace('_', '-') result_name = name.replace('_', '-')
if result_name in audit_options.get('excludes', []):
print(
"Skipping {} because it is"
"excluded in audit config"
.format(result_name))
continue
if all(p(audit_options) for p in audit.filters): if all(p(audit_options) for p in audit.filters):
try: try:
audit.func(audit_options) audit.func(audit_options)
@ -121,7 +193,13 @@ def run(audit_options):
def action_parse_results(result): def action_parse_results(result):
"""Parse the result of `run` in the context of an action.""" """Parse the result of `run` in the context of an action.
:param result: The result of running the security-checklist
action on a unit
:type result: Dict[str, Dict[str, str]]
:rtype: int
"""
passed = True passed = True
for test, result in result.items(): for test, result in result.items():
if result['success']: if result['success']:

View File

@ -194,7 +194,7 @@ SWIFT_CODENAMES = OrderedDict([
('rocky', ('rocky',
['2.18.0', '2.19.0']), ['2.18.0', '2.19.0']),
('stein', ('stein',
['2.19.0']), ['2.20.0']),
]) ])
# >= Liberty version->codename mapping # >= Liberty version->codename mapping

View File

@ -582,21 +582,24 @@ def remove_pool_snapshot(service, pool_name, snapshot_name):
raise raise
# max_bytes should be an int or long def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None):
def set_pool_quota(service, pool_name, max_bytes):
""" """
:param service: six.string_types. The Ceph user name to run the command under :param service: The Ceph user name to run the command under
:param pool_name: six.string_types :type service: str
:param max_bytes: int or long :param pool_name: Name of pool
:return: None. Can raise CalledProcessError :type pool_name: str
:param max_bytes: Maximum bytes quota to apply
:type max_bytes: int
:param max_objects: Maximum objects quota to apply
:type max_objects: int
:raises: subprocess.CalledProcessError
""" """
# Set a byte quota on a RADOS pool in ceph. cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name]
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, if max_bytes:
'max_bytes', str(max_bytes)] cmd = cmd + ['max_bytes', str(max_bytes)]
try: if max_objects:
cmd = cmd + ['max_objects', str(max_objects)]
check_call(cmd) check_call(cmd)
except CalledProcessError:
raise
def remove_pool_quota(service, pool_name): def remove_pool_quota(service, pool_name):
@ -1153,19 +1156,46 @@ class CephBrokerRq(object):
def add_op_create_pool(self, name, replica_count=3, pg_num=None, def add_op_create_pool(self, name, replica_count=3, pg_num=None,
weight=None, group=None, namespace=None, weight=None, group=None, namespace=None,
app_name=None): app_name=None, max_bytes=None, max_objects=None):
"""Adds an operation to create a pool. """DEPRECATED: Use ``add_op_create_replicated_pool()`` or
``add_op_create_erasure_pool()`` instead.
"""
return self.add_op_create_replicated_pool(
name, replica_count=replica_count, pg_num=pg_num, weight=weight,
group=group, namespace=namespace, app_name=app_name,
max_bytes=max_bytes, max_objects=max_objects)
@param pg_num setting: optional setting. If not provided, this value def add_op_create_replicated_pool(self, name, replica_count=3, pg_num=None,
will be calculated by the broker based on how many OSDs are in the weight=None, group=None, namespace=None,
cluster at the time of creation. Note that, if provided, this value app_name=None, max_bytes=None,
will be capped at the current available maximum. max_objects=None):
@param weight: the percentage of data the pool makes up """Adds an operation to create a replicated pool.
:param name: Name of pool to create
:type name: str
:param replica_count: Number of copies Ceph should keep of your data.
:type replica_count: int
:param pg_num: Request specific number of Placement Groups to create
for pool.
:type pg_num: int
:param weight: The percentage of data that is expected to be contained
in the pool from the total available space on the OSDs.
Used to calculate number of Placement Groups to create
for pool.
:type weight: float
:param group: Group to add pool to
:type group: str
:param namespace: Group namespace
:type namespace: str
:param app_name: (Optional) Tag pool with application name. Note that :param app_name: (Optional) Tag pool with application name. Note that
there is certain protocols emerging upstream with there is certain protocols emerging upstream with
regard to meaningful application names to use. regard to meaningful application names to use.
Examples are ``rbd`` and ``rgw``. Examples are ``rbd`` and ``rgw``.
:type app_name: str :type app_name: str
:param max_bytes: Maximum bytes quota to apply
:type max_bytes: int
:param max_objects: Maximum objects quota to apply
:type max_objects: int
""" """
if pg_num and weight: if pg_num and weight:
raise ValueError('pg_num and weight are mutually exclusive') raise ValueError('pg_num and weight are mutually exclusive')
@ -1173,7 +1203,41 @@ class CephBrokerRq(object):
self.ops.append({'op': 'create-pool', 'name': name, self.ops.append({'op': 'create-pool', 'name': name,
'replicas': replica_count, 'pg_num': pg_num, 'replicas': replica_count, 'pg_num': pg_num,
'weight': weight, 'group': group, 'weight': weight, 'group': group,
'group-namespace': namespace, 'app-name': app_name}) 'group-namespace': namespace, 'app-name': app_name,
'max-bytes': max_bytes, 'max-objects': max_objects})
def add_op_create_erasure_pool(self, name, erasure_profile=None,
weight=None, group=None, app_name=None,
max_bytes=None, max_objects=None):
"""Adds an operation to create a erasure coded pool.
:param name: Name of pool to create
:type name: str
:param erasure_profile: Name of erasure code profile to use. If not
set the ceph-mon unit handling the broker
request will set its default value.
:type erasure_profile: str
:param weight: The percentage of data that is expected to be contained
in the pool from the total available space on the OSDs.
:type weight: float
:param group: Group to add pool to
:type group: str
:param app_name: (Optional) Tag pool with application name. Note that
there is certain protocols emerging upstream with
regard to meaningful application names to use.
Examples are ``rbd`` and ``rgw``.
:type app_name: str
:param max_bytes: Maximum bytes quota to apply
:type max_bytes: int
:param max_objects: Maximum objects quota to apply
:type max_objects: int
"""
self.ops.append({'op': 'create-pool', 'name': name,
'pool-type': 'erasure',
'erasure-profile': erasure_profile,
'weight': weight,
'group': group, 'app-name': app_name,
'max-bytes': max_bytes, 'max-objects': max_objects})
def set_ops(self, ops): def set_ops(self, ops):
"""Set request ops to provided value. """Set request ops to provided value.

View File

@ -50,6 +50,11 @@ TRACE = "TRACE"
MARKER = object() MARKER = object()
SH_MAX_ARG = 131071 SH_MAX_ARG = 131071
RANGE_WARNING = ('Passing NO_PROXY string that includes a cidr. '
'This may not be compatible with software you are '
'running in your shell.')
cache = {} cache = {}
@ -1414,3 +1419,72 @@ def unit_doomed(unit=None):
# I don't think 'dead' units ever show up in the goal-state, but # I don't think 'dead' units ever show up in the goal-state, but
# check anyway in addition to 'dying'. # check anyway in addition to 'dying'.
return units[unit]['status'] in ('dying', 'dead') return units[unit]['status'] in ('dying', 'dead')
def env_proxy_settings(selected_settings=None):
"""Get proxy settings from process environment variables.
Get charm proxy settings from environment variables that correspond to
juju-http-proxy, juju-https-proxy and juju-no-proxy (available as of 2.4.2,
see lp:1782236) in a format suitable for passing to an application that
reacts to proxy settings passed as environment variables. Some applications
support lowercase or uppercase notation (e.g. curl), some support only
lowercase (e.g. wget), there are also subjectively rare cases of only
uppercase notation support. no_proxy CIDR and wildcard support also varies
between runtimes and applications as there is no enforced standard.
Some applications may connect to multiple destinations and expose config
options that would affect only proxy settings for a specific destination
these should be handled in charms in an application-specific manner.
:param selected_settings: format only a subset of possible settings
:type selected_settings: list
:rtype: Option(None, dict[str, str])
"""
SUPPORTED_SETTINGS = {
'http': 'HTTP_PROXY',
'https': 'HTTPS_PROXY',
'no_proxy': 'NO_PROXY',
'ftp': 'FTP_PROXY'
}
if selected_settings is None:
selected_settings = SUPPORTED_SETTINGS
selected_vars = [v for k, v in SUPPORTED_SETTINGS.items()
if k in selected_settings]
proxy_settings = {}
for var in selected_vars:
var_val = os.getenv(var)
if var_val:
proxy_settings[var] = var_val
proxy_settings[var.lower()] = var_val
# Now handle juju-prefixed environment variables. The legacy vs new
# environment variable usage is mutually exclusive
charm_var_val = os.getenv('JUJU_CHARM_{}'.format(var))
if charm_var_val:
proxy_settings[var] = charm_var_val
proxy_settings[var.lower()] = charm_var_val
if 'no_proxy' in proxy_settings:
if _contains_range(proxy_settings['no_proxy']):
log(RANGE_WARNING, level=WARNING)
return proxy_settings if proxy_settings else None
def _contains_range(addresses):
"""Check for cidr or wildcard domain in a string.
Given a string comprising a comma seperated list of ip addresses
and domain names, determine whether the string contains IP ranges
or wildcard domains.
:param addresses: comma seperated list of domains and ip addresses.
:type addresses: str
"""
return (
# Test for cidr (e.g. 10.20.20.0/24)
"/" in addresses or
# Test for wildcard domains (*.foo.com or .foo.com)
"*" in addresses or
addresses.startswith(".") or
",." in addresses or
" ." in addresses)

View File

@ -19,15 +19,16 @@ import re
import six import six
import time import time
import subprocess import subprocess
from tempfile import NamedTemporaryFile
from charmhelpers.core.host import ( from charmhelpers.core.host import (
lsb_release get_distrib_codename,
CompareHostReleases,
) )
from charmhelpers.core.hookenv import ( from charmhelpers.core.hookenv import (
log, log,
DEBUG, DEBUG,
WARNING, WARNING,
env_proxy_settings,
) )
from charmhelpers.fetch import SourceConfigError, GPGKeyError from charmhelpers.fetch import SourceConfigError, GPGKeyError
@ -303,12 +304,17 @@ def import_key(key):
"""Import an ASCII Armor key. """Import an ASCII Armor key.
A Radix64 format keyid is also supported for backwards A Radix64 format keyid is also supported for backwards
compatibility, but should never be used; the key retrieval compatibility. In this case Ubuntu keyserver will be
mechanism is insecure and subject to man-in-the-middle attacks queried for a key via HTTPS by its keyid. This method
voiding all signature checks using that key. is less preferrable because https proxy servers may
require traffic decryption which is equivalent to a
man-in-the-middle attack (a proxy server impersonates
keyserver TLS certificates and has to be explicitly
trusted by the system).
:param keyid: The key in ASCII armor format, :param key: A GPG key in ASCII armor format,
including BEGIN and END markers. including BEGIN and END markers or a keyid.
:type key: (bytes, str)
:raises: GPGKeyError if the key could not be imported :raises: GPGKeyError if the key could not be imported
""" """
key = key.strip() key = key.strip()
@ -319,35 +325,137 @@ def import_key(key):
log("PGP key found (looks like ASCII Armor format)", level=DEBUG) log("PGP key found (looks like ASCII Armor format)", level=DEBUG)
if ('-----BEGIN PGP PUBLIC KEY BLOCK-----' in key and if ('-----BEGIN PGP PUBLIC KEY BLOCK-----' in key and
'-----END PGP PUBLIC KEY BLOCK-----' in key): '-----END PGP PUBLIC KEY BLOCK-----' in key):
log("Importing ASCII Armor PGP key", level=DEBUG) log("Writing provided PGP key in the binary format", level=DEBUG)
with NamedTemporaryFile() as keyfile: if six.PY3:
with open(keyfile.name, 'w') as fd: key_bytes = key.encode('utf-8')
fd.write(key) else:
fd.write("\n") key_bytes = key
cmd = ['apt-key', 'add', keyfile.name] key_name = _get_keyid_by_gpg_key(key_bytes)
try: key_gpg = _dearmor_gpg_key(key_bytes)
subprocess.check_call(cmd) _write_apt_gpg_keyfile(key_name=key_name, key_material=key_gpg)
except subprocess.CalledProcessError:
error = "Error importing PGP key '{}'".format(key)
log(error)
raise GPGKeyError(error)
else: else:
raise GPGKeyError("ASCII armor markers missing from GPG key") raise GPGKeyError("ASCII armor markers missing from GPG key")
else: else:
# We should only send things obviously not a keyid offsite
# via this unsecured protocol, as it may be a secret or part
# of one.
log("PGP key found (looks like Radix64 format)", level=WARNING) log("PGP key found (looks like Radix64 format)", level=WARNING)
log("INSECURLY importing PGP key from keyserver; " log("SECURELY importing PGP key from keyserver; "
"full key not provided.", level=WARNING) "full key not provided.", level=WARNING)
cmd = ['apt-key', 'adv', '--keyserver', # as of bionic add-apt-repository uses curl with an HTTPS keyserver URL
'hkp://keyserver.ubuntu.com:80', '--recv-keys', key] # to retrieve GPG keys. `apt-key adv` command is deprecated as is
try: # apt-key in general as noted in its manpage. See lp:1433761 for more
_run_with_retries(cmd) # history. Instead, /etc/apt/trusted.gpg.d is used directly to drop
except subprocess.CalledProcessError: # gpg
error = "Error importing PGP key '{}'".format(key) key_asc = _get_key_by_keyid(key)
log(error) # write the key in GPG format so that apt-key list shows it
raise GPGKeyError(error) key_gpg = _dearmor_gpg_key(key_asc)
_write_apt_gpg_keyfile(key_name=key, key_material=key_gpg)
def _get_keyid_by_gpg_key(key_material):
"""Get a GPG key fingerprint by GPG key material.
Gets a GPG key fingerprint (40-digit, 160-bit) by the ASCII armor-encoded
or binary GPG key material. Can be used, for example, to generate file
names for keys passed via charm options.
:param key_material: ASCII armor-encoded or binary GPG key material
:type key_material: bytes
:raises: GPGKeyError if invalid key material has been provided
:returns: A GPG key fingerprint
:rtype: str
"""
# trusty, xenial and bionic handling differs due to gpg 1.x to 2.x change
release = get_distrib_codename()
is_gpgv2_distro = CompareHostReleases(release) >= "bionic"
if is_gpgv2_distro:
# --import is mandatory, otherwise fingerprint is not printed
cmd = 'gpg --with-colons --import-options show-only --import --dry-run'
else:
cmd = 'gpg --with-colons --with-fingerprint'
ps = subprocess.Popen(cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
out, err = ps.communicate(input=key_material)
if six.PY3:
out = out.decode('utf-8')
err = err.decode('utf-8')
if 'gpg: no valid OpenPGP data found.' in err:
raise GPGKeyError('Invalid GPG key material provided')
# from gnupg2 docs: fpr :: Fingerprint (fingerprint is in field 10)
return re.search(r"^fpr:{9}([0-9A-F]{40}):$", out, re.MULTILINE).group(1)
def _get_key_by_keyid(keyid):
"""Get a key via HTTPS from the Ubuntu keyserver.
Different key ID formats are supported by SKS keyservers (the longer ones
are more secure, see "dead beef attack" and https://evil32.com/). Since
HTTPS is used, if SSLBump-like HTTPS proxies are in place, they will
impersonate keyserver.ubuntu.com and generate a certificate with
keyserver.ubuntu.com in the CN field or in SubjAltName fields of a
certificate. If such proxy behavior is expected it is necessary to add the
CA certificate chain containing the intermediate CA of the SSLBump proxy to
every machine that this code runs on via ca-certs cloud-init directive (via
cloudinit-userdata model-config) or via other means (such as through a
custom charm option). Also note that DNS resolution for the hostname in a
URL is done at a proxy server - not at the client side.
8-digit (32 bit) key ID
https://keyserver.ubuntu.com/pks/lookup?search=0x4652B4E6
16-digit (64 bit) key ID
https://keyserver.ubuntu.com/pks/lookup?search=0x6E85A86E4652B4E6
40-digit key ID:
https://keyserver.ubuntu.com/pks/lookup?search=0x35F77D63B5CEC106C577ED856E85A86E4652B4E6
:param keyid: An 8, 16 or 40 hex digit keyid to find a key for
:type keyid: (bytes, str)
:returns: A key material for the specified GPG key id
:rtype: (str, bytes)
:raises: subprocess.CalledProcessError
"""
# options=mr - machine-readable output (disables html wrappers)
keyserver_url = ('https://keyserver.ubuntu.com'
'/pks/lookup?op=get&options=mr&exact=on&search=0x{}')
curl_cmd = ['curl', keyserver_url.format(keyid)]
# use proxy server settings in order to retrieve the key
return subprocess.check_output(curl_cmd,
env=env_proxy_settings(['https']))
def _dearmor_gpg_key(key_asc):
"""Converts a GPG key in the ASCII armor format to the binary format.
:param key_asc: A GPG key in ASCII armor format.
:type key_asc: (str, bytes)
:returns: A GPG key in binary format
:rtype: (str, bytes)
:raises: GPGKeyError
"""
ps = subprocess.Popen(['gpg', '--dearmor'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
out, err = ps.communicate(input=key_asc)
# no need to decode output as it is binary (invalid utf-8), only error
if six.PY3:
err = err.decode('utf-8')
if 'gpg: no valid OpenPGP data found.' in err:
raise GPGKeyError('Invalid GPG key material. Check your network setup'
' (MTU, routing, DNS) and/or proxy server settings'
' as well as destination keyserver status.')
else:
return out
def _write_apt_gpg_keyfile(key_name, key_material):
"""Writes GPG key material into a file at a provided path.
:param key_name: A key name to use for a key file (could be a fingerprint)
:type key_name: str
:param key_material: A GPG key material (binary)
:type key_material: (str, bytes)
"""
with open('/etc/apt/trusted.gpg.d/{}.gpg'.format(key_name),
'wb') as keyf:
keyf.write(key_material)
def add_source(source, key=None, fail_invalid=False): def add_source(source, key=None, fail_invalid=False):
@ -442,13 +550,13 @@ def add_source(source, key=None, fail_invalid=False):
def _add_proposed(): def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list """Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for Uses get_distrib_codename to determine the correct stanza for
the deb line. the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release. other architectures PROPOSED_PORTS_POCKET is used for the release.
""" """
release = lsb_release()['DISTRIB_CODENAME'] release = get_distrib_codename()
arch = platform.machine() arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET): if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed" raise SourceConfigError("Arch {} not supported for (distro-)proposed"
@ -461,11 +569,16 @@ def _add_apt_repository(spec):
"""Add the spec using add_apt_repository """Add the spec using add_apt_repository
:param spec: the parameter to pass to add_apt_repository :param spec: the parameter to pass to add_apt_repository
:type spec: str
""" """
if '{series}' in spec: if '{series}' in spec:
series = lsb_release()['DISTRIB_CODENAME'] series = get_distrib_codename()
spec = spec.replace('{series}', series) spec = spec.replace('{series}', series)
_run_with_retries(['add-apt-repository', '--yes', spec]) # software-properties package for bionic properly reacts to proxy settings
# passed as environment variables (See lp:1433761). This is not the case
# LTS and non-LTS releases below bionic.
_run_with_retries(['add-apt-repository', '--yes', spec],
cmd_env=env_proxy_settings(['https']))
def _add_cloud_pocket(pocket): def _add_cloud_pocket(pocket):
@ -534,7 +647,7 @@ def _verify_is_ubuntu_rel(release, os_release):
:raises: SourceConfigError if the release is not the same as the ubuntu :raises: SourceConfigError if the release is not the same as the ubuntu
release. release.
""" """
ubuntu_rel = lsb_release()['DISTRIB_CODENAME'] ubuntu_rel = get_distrib_codename()
if release != ubuntu_rel: if release != ubuntu_rel:
raise SourceConfigError( raise SourceConfigError(
'Invalid Cloud Archive release specified: {}-{} on this Ubuntu' 'Invalid Cloud Archive release specified: {}-{} on this Ubuntu'

View File

@ -464,6 +464,17 @@ class GlanceBasicDeployment(OpenStackAmuletDeployment):
self.d.configure(juju_service, set_default) self.d.configure(juju_service, set_default)
def test_500_security_checklist_action(self):
"""Verify expected result on a default install"""
u.log.debug("Testing security-checklist")
sentry_unit = self.glance_sentry
action_id = u.run_action(sentry_unit, "security-checklist")
u.wait_on_action(action_id)
data = amulet.actions.get_action_output(action_id, full_output=True)
assert data.get(u"status") == "failed", \
"Security check is expected to not pass by default"
def test_900_glance_restart_on_config_change(self): def test_900_glance_restart_on_config_change(self):
"""Verify that the specified services are restarted when the config """Verify that the specified services are restarted when the config
is changed.""" is changed."""