316 lines
10 KiB
Python
316 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Kayobe overcloud host configure tests.
|
|
# Uses py.test and TestInfra.
|
|
|
|
import ipaddress
|
|
import os
|
|
import time
|
|
|
|
import distro
|
|
import pytest
|
|
|
|
|
|
def _is_apt():
|
|
info = distro.linux_distribution()
|
|
return info[0].startswith('Ubuntu')
|
|
|
|
|
|
def _is_dnf():
|
|
info = distro.id()
|
|
return info in ['centos', 'rocky']
|
|
|
|
|
|
def _is_dnf_mirror():
|
|
info = distro.id()
|
|
return info == 'centos'
|
|
|
|
|
|
def test_network_ethernet(host):
|
|
interface = host.interface('dummy2')
|
|
assert interface.exists
|
|
assert '192.168.34.1' in interface.addresses
|
|
routes = host.check_output('/sbin/ip route show dev dummy2')
|
|
assert '192.168.40.0/24 via 192.168.34.254' in routes
|
|
|
|
|
|
def test_network_ethernet_vlan(host):
|
|
interface = host.interface('dummy2.42')
|
|
assert interface.exists
|
|
assert '192.168.35.1' in interface.addresses
|
|
assert host.file('/sys/class/net/dummy2.42/lower_dummy2').exists
|
|
# FIXME(bbezak): remove following IF after ansible-role-interfaces
|
|
# receive support for custom routes in NetworkManager
|
|
if not ('centos' in host.system_info.distribution.lower() or
|
|
'rocky' in host.system_info.distribution.lower()):
|
|
routes = host.check_output(
|
|
'/sbin/ip route show dev dummy2.42 table kayobe-test-route-table')
|
|
assert '192.168.40.0/24 via 192.168.35.254' in routes
|
|
rules = host.check_output(
|
|
'/sbin/ip rule show table kayobe-test-route-table')
|
|
expected = 'from 192.168.35.0/24 lookup kayobe-test-route-table'
|
|
assert expected in rules
|
|
|
|
|
|
def test_network_bridge(host):
|
|
interface = host.interface('br0')
|
|
assert interface.exists
|
|
assert '192.168.36.1' in interface.addresses
|
|
ports = ['dummy3', 'dummy4']
|
|
sys_ports = host.check_output('ls -1 /sys/class/net/br0/brif')
|
|
assert sys_ports == "\n".join(ports)
|
|
for port in ports:
|
|
interface = host.interface(port)
|
|
assert interface.exists
|
|
v4_addresses = [a for a in interface.addresses
|
|
if ipaddress.ip_address(a).version == '4']
|
|
assert not v4_addresses
|
|
|
|
|
|
def test_network_bridge_vlan(host):
|
|
interface = host.interface('br0.43')
|
|
assert interface.exists
|
|
assert '192.168.37.1' in interface.addresses
|
|
assert host.file('/sys/class/net/br0.43/lower_br0').exists
|
|
|
|
|
|
def test_network_bond(host):
|
|
interface = host.interface('bond0')
|
|
assert interface.exists
|
|
assert '192.168.38.1' in interface.addresses
|
|
sys_slaves = host.check_output('cat /sys/class/net/bond0/bonding/slaves')
|
|
# Ordering is not guaranteed, so compare sets.
|
|
sys_slaves = set(sys_slaves.split())
|
|
slaves = set(['dummy5', 'dummy6'])
|
|
assert sys_slaves == slaves
|
|
for slave in slaves:
|
|
interface = host.interface(slave)
|
|
assert interface.exists
|
|
assert not interface.addresses
|
|
|
|
|
|
def test_network_bond_vlan(host):
|
|
interface = host.interface('bond0.44')
|
|
assert interface.exists
|
|
assert '192.168.39.1' in interface.addresses
|
|
assert host.file('/sys/class/net/bond0.44/lower_bond0').exists
|
|
|
|
|
|
def test_network_bridge_no_ip(host):
|
|
interface = host.interface('br1')
|
|
assert interface.exists
|
|
assert not '192.168.40.1' in interface.addresses
|
|
|
|
|
|
def test_additional_user_account(host):
|
|
user = host.user("kayobe-test-user")
|
|
assert user.name == "kayobe-test-user"
|
|
assert user.group == "kayobe-test-user"
|
|
assert set(user.groups) == {"kayobe-test-user", "stack"}
|
|
assert user.gecos == "Kayobe test user"
|
|
with host.sudo():
|
|
assert user.password == 'kayobe-test-user-password'
|
|
|
|
|
|
def test_software_RAID(host):
|
|
slaves = host.check_output("ls -1 /sys/class/block/md0/slaves/")
|
|
assert slaves == "loop0\nloop1"
|
|
|
|
|
|
def test_luks(host):
|
|
# blkid returns an emptry string without root permissions
|
|
with host.sudo():
|
|
blkid = host.check_output('blkid /dev/md0')
|
|
assert 'TYPE="crypto_LUKS"' in blkid
|
|
|
|
|
|
def test_sysctls(host):
|
|
assert host.sysctl("fs.mount-max") == 99999
|
|
|
|
|
|
def test_cloud_init_is_disabled(host):
|
|
assert host.file("/etc/cloud/cloud-init.disabled").exists
|
|
|
|
|
|
def test_docker_storage_driver_is_devicemapper(host):
|
|
with host.sudo("stack"):
|
|
info = host.check_output("docker info")
|
|
assert "devicemapper" in info
|
|
|
|
|
|
@pytest.mark.parametrize('user', ['kolla', 'stack'])
|
|
def test_docker_image_download(host, user):
|
|
with host.sudo(user):
|
|
host.check_output("docker pull alpine")
|
|
|
|
|
|
@pytest.mark.parametrize('user', ['kolla', 'stack'])
|
|
def test_docker_container_run(host, user):
|
|
with host.sudo(user):
|
|
host.check_output("docker run --rm alpine /bin/true")
|
|
|
|
|
|
def test_timezone(host):
|
|
status = host.check_output("timedatectl status")
|
|
assert "Pacific/Honolulu" in status
|
|
|
|
|
|
def test_ntp_alternative_services_disabled(host):
|
|
# Tests that we don't have any conflicting NTP servers running
|
|
# NOTE(wszumski): We always mask services even if they don't exist
|
|
ntpd_service = host.service("ntp")
|
|
assert ntpd_service.is_masked
|
|
assert not ntpd_service.is_running
|
|
|
|
timesyncd_service = host.service("systemd-timesyncd")
|
|
assert timesyncd_service.is_masked
|
|
assert not timesyncd_service.is_running
|
|
|
|
|
|
def test_ntp_running(host):
|
|
# Tests that NTP services are enabled and running
|
|
assert host.package("chrony").is_installed
|
|
assert host.service("chronyd").is_enabled
|
|
assert host.service("chronyd").is_running
|
|
|
|
|
|
def test_ntp_non_default_time_server(host):
|
|
# Tests that the NTP pool has been changed from pool.ntp.org to
|
|
# time.cloudflare.com
|
|
if ('centos' in host.system_info.distribution.lower() or
|
|
'rocky' in host.system_info.distribution.lower()):
|
|
chrony_config = host.file("/etc/chrony.conf")
|
|
else:
|
|
# Debian based distributions use the following path
|
|
chrony_config = host.file("/etc/chrony/chrony.conf")
|
|
assert chrony_config.exists
|
|
assert "time.cloudflare.com" in chrony_config.content_string
|
|
|
|
|
|
def test_ntp_clock_synchronized(host):
|
|
# Tests that the clock is synchronized
|
|
status_output = host.check_output("timedatectl status")
|
|
assert "synchronized: yes" in status_output
|
|
|
|
|
|
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
|
|
def test_apt_config(host):
|
|
apt_config = host.file("/etc/apt/apt.conf.d/99retries")
|
|
assert apt_config.exists
|
|
assert apt_config.content_string == "Acquire::Retries 1;\n"
|
|
|
|
|
|
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
|
|
def test_apt_custom_package_repository_is_available(host):
|
|
with host.sudo():
|
|
host.check_output("apt -y install td-agent")
|
|
assert host.package("td-agent").is_installed
|
|
|
|
|
|
@pytest.mark.parametrize('repo', ["appstream", "baseos", "extras", "epel"])
|
|
@pytest.mark.skipif(not _is_dnf_mirror(), reason="DNF OpenDev mirror only for CentOS 8")
|
|
def test_dnf_local_package_mirrors(host, repo):
|
|
# Depends on SITE_MIRROR_FQDN environment variable.
|
|
assert os.getenv('SITE_MIRROR_FQDN')
|
|
# NOTE(mgoddard): Should not require sudo but some files
|
|
# (/var/cache/dnf/expired_repos.json) can have incorrect permissions.
|
|
# https://bugzilla.redhat.com/show_bug.cgi?id=1636909
|
|
with host.sudo():
|
|
info = host.check_output("dnf repoinfo %s", repo)
|
|
assert os.getenv('SITE_MIRROR_FQDN') in info
|
|
|
|
|
|
@pytest.mark.skipif(not _is_dnf(), reason="DNF only supported on CentOS/Rocky")
|
|
def test_dnf_custom_package_repository_is_available(host):
|
|
with host.sudo():
|
|
host.check_output("dnf -y install td-agent")
|
|
assert host.package("td-agent").is_installed
|
|
|
|
|
|
@pytest.mark.skipif(not _is_dnf(), reason="DNF only supported on CentOS/Rocky")
|
|
def test_dnf_automatic(host):
|
|
assert host.package("dnf-automatic").is_installed
|
|
assert host.service("dnf-automatic.timer").is_enabled
|
|
assert host.service("dnf-automatic.timer").is_running
|
|
|
|
|
|
@pytest.mark.skipif(not _is_dnf(),
|
|
reason="tuned profile setting only supported on CentOS/Rocky")
|
|
def test_tuned_profile_is_active(host):
|
|
tuned_output = host.check_output("tuned-adm active")
|
|
assert "throughput-performance" in tuned_output
|
|
|
|
|
|
def test_firewalld_running(host):
|
|
assert host.package("firewalld").is_installed
|
|
assert host.service("firewalld.service").is_enabled
|
|
assert host.service("firewalld.service").is_running
|
|
|
|
|
|
def test_firewalld_zones(host):
|
|
# Verify that interfaces are on correct zones.
|
|
expected_zones = {
|
|
'dummy2.42': 'test-zone1',
|
|
'br0': 'test-zone2',
|
|
'br0.43': 'test-zone3',
|
|
'bond0': 'test-zone3',
|
|
'bond0.44': 'public'
|
|
}
|
|
for interface, expected_zone in expected_zones.items():
|
|
with host.sudo():
|
|
zone = host.check_output(
|
|
"firewall-cmd --get-zone-of-interface %s", interface)
|
|
assert zone == expected_zone
|
|
|
|
zone = host.check_output(
|
|
"firewall-cmd --permanent --get-zone-of-interface %s",
|
|
interface)
|
|
assert zone == expected_zone
|
|
|
|
|
|
def test_firewalld_rules(host):
|
|
# Verify that expected rules are present.
|
|
expected_info = {
|
|
'test-zone1': [
|
|
' services: ',
|
|
' ports: 8080/tcp',
|
|
' icmp-blocks: ',
|
|
],
|
|
'test-zone2': [
|
|
' services: http',
|
|
' ports: ',
|
|
' icmp-blocks: ',
|
|
],
|
|
'test-zone3': [
|
|
' services: ',
|
|
' ports: ',
|
|
' icmp-blocks: echo-request',
|
|
],
|
|
'public': [
|
|
' services: dhcpv6-client ssh',
|
|
' ports: ',
|
|
' icmp-blocks: ',
|
|
],
|
|
}
|
|
|
|
for zone, expected_lines in expected_info.items():
|
|
with host.sudo():
|
|
info = host.check_output(
|
|
"firewall-cmd --info-zone %s", zone)
|
|
info = info.splitlines()
|
|
perm_info = host.check_output(
|
|
"firewall-cmd --permanent --info-zone %s", zone)
|
|
perm_info = perm_info.splitlines()
|
|
|
|
for expected_line in expected_lines:
|
|
assert expected_line in info
|
|
assert expected_line in perm_info
|
|
|
|
|
|
def test_swap(host):
|
|
swapon = host.check_output("swapon -s")
|
|
swapon = swapon.splitlines()
|
|
assert len(swapon) > 1
|
|
swap_devs = [swap.split()[0] for swap in swapon[1:]]
|
|
assert "/swapfile" in swap_devs
|