Make check_latest_packages_version roles more generic

This patch removes reference to TripleO packages.

It also brings a better testing coverage through Molecule.

This patch also refactors the module itself, so it now process
the packages in one go, and reports results to ansible.

Futhermore, validation will now check for appropriate package
arhchitecture, instead of assuming they are all the same.

Validation now also determines the package manager itself,
assuming it isn't provided one by ansible.
Default fact gathering is thus no longer needed,
potentially saving storage and improving performance.

Change-Id: I5427523f6397f30538cdebabdfc22bc547c2580e
Signed-off-by: Gael Chamoulaud (Strider) <gchamoul@redhat.com>
Signed-off-by: Jiri Podivin <jpodivin@redhat.com>
This commit is contained in:
Gael Chamoulaud (Strider) 2021-02-09 11:31:32 +01:00 committed by Jiri Podivin
parent ab26d8263a
commit c6503e5105
9 changed files with 543 additions and 127 deletions

@ -13,7 +13,23 @@
# License for the specific language governing permissions and limitations
# under the License.
""" Check for available updates for a given package."""
"""Check for available updates for a given package.
Module queries and parses output of at least two separate
external binaries, in order to obtain information about
supported package manager, installed and available packages.
As such it has many points of failure.
Information about supported package managers,
such as the commands to use while working with them
and the expected stderr output we can encounter while querying repos,
are stored as a nested dictionery SUPPORTED_PKG_MGRS.
With names of the supported package managers as keys
of the first level elements. And the aformentioned information
on the second level, as lists of strings, with self-explanatory keys.
Formally speaking it is a tree of a sort.
But so is entire python namespace.
"""
import collections
import subprocess
@ -24,21 +40,23 @@ from yaml import safe_load as yaml_safe_load
DOCUMENTATION = '''
---
module: check_package_update
short_description: Check for available updates for a given package
short_description: Check for available updates for given packages
description:
- Check for available updates for a given package
- Check for available updates for given packages
options:
package:
packages_list:
required: true
description:
- The name of the package you want to check
type: str
- The names of the packages you want to check
type: list
pkg_mgr:
required: true
required: false
description:
- Supported Package Manager, DNF or YUM
type: str
author: "Florian Fuchs"
author:
- Florian Fuchs
- Jiri Podivin (@jpodivin)
'''
EXAMPLES = '''
@ -46,88 +64,239 @@ EXAMPLES = '''
tasks:
- name: Get available updates for packages
check_package_update:
package: python-tripleoclient
pkg_mgr: "{{ ansible_pkg_mgr}}"
packages_list:
- coreutils
- wget
pkg_mgr: "{{ ansible_pkg_mgr }}"
'''
SUPPORTED_PKG_MGRS = (
'yum',
'dnf',
)
SUPPORTED_PKG_MGRS = {
'dnf': {
'query_installed': [
'rpm', '-qa', '--qf',
'%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}\n'
],
'query_available': [
'dnf', '-q', 'list', '--available'
],
'allowed_errors': [
'',
'Error: No matching Packages to list\n'
]
},
'yum': {
'query_installed': [
'rpm', '-qa', '--qf',
'%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}\n'
],
'query_available': [
'yum', '-q', 'list', 'available'
],
'allowed_errors': [
'',
'Error: No matching Packages to list\n'
]
},
}
PackageDetails = collections.namedtuple('PackageDetails',
['name', 'version', 'release', 'arch'])
PackageDetails = collections.namedtuple(
'PackageDetails',
['name', 'version', 'release', 'arch'])
def get_package_details(output):
if output:
return PackageDetails(
output.split('|')[0],
output.split('|')[1],
output.split('|')[2],
output.split('|')[3],
def get_package_details(pkg_details_string):
"""Returns PackageDetails namedtuple from given string.
Raises ValueError if the number of '|' separated
fields is < 4.
"""
split_output = pkg_details_string.split('|')
try:
pkg_details = PackageDetails(
split_output[0],
split_output[1],
split_output[2],
split_output[3],
)
except IndexError:
raise ValueError(
(
"Package description '{}' doesn't contain fields"
" required for processing."
).format(pkg_details_string)
)
return pkg_details
def _allowed_pkg_manager_stderr(stderr, allowed_errors):
"""Returns False if the error message isn't in the
allowed_errors list.
This function factors out large, and possibly expanding,
condition so it doesn't cause too much confusion.
"""
if stderr in allowed_errors:
return True
return False
def _command(command):
# Return the result of a subprocess call
# as [stdout, stderr]
process = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
"""
:returns: the result of a subprocess call
as a tuple (stdout, stderr).
"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
return process.communicate()
def check_update(module, package, pkg_mgr):
def _get_pkg_manager(module):
"""Return name of available package manager.
Queries binaries using `command -v`, in order defined by
the `SUPPORTED_PKG_MGRS`.
:returns: string
"""
for possible_pkg_mgr in SUPPORTED_PKG_MGRS:
stdout, stderr = _command(['command', '-v', possible_pkg_mgr])
if stdout != '' and stderr == '':
return possible_pkg_mgr
module.fail_json(
msg=(
"None of the supported package managers '{}' seems to be "
"available on this system."
).format(' '.join(SUPPORTED_PKG_MGRS))
)
def _get_new_pkg_info(available_stdout):
"""Return package information as dictionary. With package names
as keys and detailed information as list of strings.
"""
available_stdout = available_stdout.split('\n')[1:]
available_stdout = [line.rstrip().split() for line in available_stdout]
new_pkgs_info = {}
for line in available_stdout:
if len(line) != 0:
new_pkgs_info[line[0]] = PackageDetails(
line[0],
line[1].split('-')[0],
line[1].split('-')[1],
line[0].split('.')[1])
return new_pkgs_info
def _get_installed_pkgs(installed_stdout, packages, module):
"""Return dictionary of installed packages.
Package names form keys and the output of the get_package_details
function values of the dictionary.
"""
installed = {}
installed_stdout = installed_stdout.split('\n')[:-1]
for package in installed_stdout:
if package != '':
package = get_package_details(package)
if package.name in packages:
installed[package.name + '.' + package.arch] = package
packages.remove(package.name)
#Once find all the requested packages we don't need to continue search
if len(packages) == 0:
break
#Even a single missing package is a reason for failure.
if len(packages) > 0:
msg = "Following packages are not installed {}".format(packages)
module.fail_json(
msg=msg
)
return
return installed
def check_update(module, packages_list, pkg_mgr):
"""Check if the packages in the 'packages_list are up to date.
Queries binaries, defined the in relevant SUPPORTED_PKG_MGRS entry,
to obtain information about present and available packages.
"""
if len(packages_list) == 0:
module.fail_json(
msg="No packages given to check.")
return
if pkg_mgr is None:
pkg_mgr = _get_pkg_manager(module=module)
if pkg_mgr not in SUPPORTED_PKG_MGRS:
module.fail_json(
msg='Package manager "{}" is not supported.'.format(pkg_mgr))
return
installed_stdout, installed_stderr = _command(
['rpm', '-qa', '--qf',
'%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}',
package])
pkg_mgr = SUPPORTED_PKG_MGRS[pkg_mgr]
installed_stdout, installed_stderr = _command(pkg_mgr['query_installed'])
# Fail the module if for some reason we can't lookup the current package.
if installed_stderr != '':
module.fail_json(msg=installed_stderr)
return
elif not installed_stdout:
if not installed_stdout:
module.fail_json(
msg='"{}" is not an installed package.'.format(package))
msg='no output returned for the query.{}'.format(
' '.join(pkg_mgr['query_installed'])
))
return
installed = get_package_details(installed_stdout)
installed = _get_installed_pkgs(installed_stdout, packages_list, module)
pkg_mgr_option = 'available'
if pkg_mgr == 'dnf':
pkg_mgr_option = '--available'
installed_pkg_names = ' '.join(installed)
available_stdout, available_stderr = _command(
[pkg_mgr, '-q', 'list', pkg_mgr_option, installed.name])
pkg_mgr['query_available'].append(installed_pkg_names)
available_stdout, available_stderr = _command(pkg_mgr['query_available'])
#We need to check that the stderr consists only of the expected strings
#This can get complicated if the CLI on the pkg manager side changes.
if not _allowed_pkg_manager_stderr(available_stderr, pkg_mgr['allowed_errors']):
module.fail_json(msg=available_stderr)
return
if available_stdout:
new_pkg_info = available_stdout.split('\n')[1].rstrip().split()[:2]
new_ver, new_rel = new_pkg_info[1].split('-')
module.exit_json(
changed=False,
name=installed.name,
current_version=installed.version,
current_release=installed.release,
new_version=new_ver,
new_release=new_rel)
new_pkgs_info = _get_new_pkg_info(available_stdout)
else:
module.exit_json(
changed=False,
name=installed.name,
current_version=installed.version,
current_release=installed.release,
new_version=None,
new_release=None)
new_pkgs_info = {}
results = []
for installed_pkg in installed:
results.append(
{
'name': installed_pkg,
'current_version': installed[installed_pkg].version,
'current_release': installed[installed_pkg].release,
'new_version': None,
'new_release': None
}
)
if installed_pkg in new_pkgs_info:
results[-1]['new_version'] = new_pkgs_info[installed_pkg][1]
results[-1]['new_release'] = new_pkgs_info[installed_pkg][2]
module.exit_json(
changed=False,
outdated_pkgs=results
)
def main():
@ -135,9 +304,10 @@ def main():
argument_spec=yaml_safe_load(DOCUMENTATION)['options']
)
check_update(module,
module.params.get('package'),
module.params.get('pkg_mgr'))
check_update(
module,
packages_list=module.params.get('packages_list'),
pkg_mgr=module.params.get('pkg_mgr', None))
if __name__ == '__main__':

@ -1,10 +1,2 @@
---
tripleoclient: >-
{%- if ansible_distribution == 'RedHat' and ansible_distribution_major_version == '8' -%}
python3-tripleoclient
{%- else -%}
python2-tripleoclient
{%- endif -%}
packages_list:
- "{{ tripleoclient }}"
packages_list: []

@ -19,27 +19,64 @@
hosts: all
tasks:
- name: Validate No Available Update for patch rpm
include_role:
name: check_latest_packages_version
vars:
packages_list:
- patch
- name: Working Detection of Update for Pam package
- name: Run validation with empty package list
block:
- include_role:
name: check_latest_packages_version
vars:
packages_list:
- pam
rescue:
- name: Clear host errors
meta: clear_host_errors
- debug:
msg: The validation works! End the playbook run
msg: |
The validation fails due to an empty package list
given as parameter.
- name: Working Detection of Update for valfrwk-release package
block:
- include_role:
name: check_latest_packages_version
vars:
packages_list:
- valfrwk-release
rescue:
- name: Clear host errors
meta: clear_host_errors
- debug:
msg: The validation has detected a new version!
- name: Validate No Available Update for valfrwk-release rpm
block:
- name: Update valfrwk-release rpm to the latest one
package:
name: valfrwk-release
state: latest
- include_role:
name: check_latest_packages_version
vars:
packages_list:
- valfrwk-release
- name: Working Detection of Update for an uninstalled package
block:
- include_role:
name: check_latest_packages_version
vars:
packages_list:
- whatchamacallit
rescue:
- name: Clear host errors
meta: clear_host_errors
- debug:
msg: |
The validation fails because 'whatchamacallit' rpm is not
installed! End the playbook run
- name: End play
meta: end_play

@ -20,6 +20,41 @@
gather_facts: false
tasks:
- name: install patch rpm
- name: Create /opt/valfrwk directory
file:
path: '/opt/valfrwk'
state: directory
mode: "0777"
- name: Copy valfrwk-release packages
copy:
src: "{{ item }}"
dest: /opt/valfrwk
with_items:
- ./valfrwk-release-1.0.0-1.20210331045404.4c29590.el8.x86_64.rpm
- ./valfrwk-release-1.0.0-2.20210401064344.c8ee186.el8.x86_64.rpm
- ./valfrwk-release-1.0.1-1.20210401074356.drh345o.el8.x86_64.rpm
- name: Install createrepo rpm
package:
name: patch
name: createrepo
state: present
- name: Generate yum repo
command: >-
createrepo /opt/valfrwk/
args:
warn: false
- name: Add valfrwk yum repository
yum_repository:
name: valfrwk
description: Validations Framework Repo
file: valfrwk
baseurl: file:///opt/valfrwk
gpgcheck: false
- name: Install the oldest valfrwk-release rpm
package:
name: valfrwk-release-1.0.0-1.20210331045404.4c29590.el8
state: present

@ -4,17 +4,10 @@
gather_subset:
- '!all'
- '!min'
- pkg_mgr
- name: Gather package facts
package_facts:
manager: auto
- name: Get available updates for packages
check_package_update:
package: "{{ item }}"
pkg_mgr: "{{ ansible_pkg_mgr }}"
with_items: "{{ packages_list }}"
packages_list: "{{ packages_list }}"
register: updates
- name: Check if current version is the latest one
@ -23,5 +16,5 @@
A newer version of the {{ item.name }} package is
available: {{ item.new_version }}-{{ item.new_release }}
(currently {{ item.current_version }}-{{ item.current_release }})
with_items: "{{ updates.results }}"
with_items: "{{ updates.outdated_pkgs }}"
when: item.new_version

@ -12,16 +12,16 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest.mock import MagicMock
from unittest.mock import patch
import subprocess
from unittest import mock
from validations_common.library import check_package_update as cppkg
from validations_common.library.check_package_update import check_update
from validations_common.library.check_package_update import get_package_details
from validations_common.tests import base
PKG_INSTALLED = "foo-package|6.1.5|1|x86_64"
PKG_INVALID = "foo-package|6.1.5|x86_64"
PKG_AVAILABLE = """\
Available Packages
foo-package.x86_64 8.0.0-1 foo-stable
@ -31,65 +31,254 @@ foo-package.x86_64 8.0.0-1 foo-stable
class TestGetPackageDetails(base.TestCase):
def setUp(self):
super(TestGetPackageDetails, self).setUp()
self.entry = get_package_details("foo-package|6.2.0|1|x86_64")
self.entry = PKG_INSTALLED
self.invalid_pkg = PKG_INVALID
def test_name(self):
self.assertEqual(self.entry.name, 'foo-package')
details = cppkg.get_package_details(self.entry)
self.assertEqual(details.name, 'foo-package')
def test_arch(self):
self.assertEqual(self.entry.arch, 'x86_64')
details = cppkg.get_package_details(self.entry)
self.assertEqual(details.arch, 'x86_64')
def test_version(self):
self.assertEqual(self.entry.version, '6.2.0')
details = cppkg.get_package_details(self.entry)
self.assertEqual(details.version, '6.1.5')
def test_release(self):
self.assertEqual(self.entry.release, '1')
details = cppkg.get_package_details(self.entry)
self.assertEqual(details.release, '1')
def test_index_error(self):
self.assertRaises(ValueError, cppkg.get_package_details, self.invalid_pkg)
class TestCheckUpdate(base.TestCase):
def setUp(self):
super(TestCheckUpdate, self).setUp()
self.module = MagicMock()
self.module = mock.MagicMock()
self.package_details = cppkg.get_package_details("foo-package|6.1.5|1|x86_64")
def test_empty_pkg_list_fails(self):
cppkg.check_update(self.module, [], 'dnf')
self.module.fail_json.assert_called_once_with(
msg='No packages given to check.')
self.module.reset_mock()
def test_unsupported_pkg_mgr_fails(self):
check_update(self.module, 'foo-package', 'apt')
cppkg.check_update(self.module, ['foo-package'], 'apt')
self.module.fail_json.assert_called_with(
msg='Package manager "apt" is not supported.')
@patch('validations_common.library.check_package_update._command')
self.module.reset_mock()
@mock.patch('validations_common.library.check_package_update._command')
def test_fails_if_installed_package_not_found(self, mock_command):
mock_command.side_effect = [
['', 'No package found.'],
]
check_update(self.module, 'foo-package', 'yum')
cppkg.check_update(self.module, ['foo-package'], 'yum')
self.module.fail_json.assert_called_with(
msg='No package found.')
@patch('validations_common.library.check_package_update._command')
def test_returns_current_and_available_versions(self, mock_command):
self.module.reset_mock()
@mock.patch(
'validations_common.library.check_package_update._get_new_pkg_info',
return_value={
'foo-package.x86_64': cppkg.PackageDetails(
'foo-package.x86_64',
'8.0.0',
'1',
'foo-stable')
}
)
@mock.patch(
'validations_common.library.check_package_update._get_installed_pkgs')
@mock.patch('validations_common.library.check_package_update._command')
def test_returns_current_and_available_versions(self, mock_command,
mock_get_installed, mock_get_new_pkg_info):
mock_command.side_effect = [
[PKG_INSTALLED, ''],
[PKG_AVAILABLE, ''],
]
check_update(self.module, 'foo-package', 'yum')
self.module.exit_json.assert_called_with(changed=False,
name='foo-package',
current_version='6.1.5',
current_release='1',
new_version='8.0.0',
new_release='1')
mock_get_installed.side_effect = [{'foo-package.x86_64': self.package_details}]
@patch('validations_common.library.check_package_update._command')
def test_returns_current_version_if_no_updates(self, mock_command):
cppkg.check_update(self.module, ['foo-package'], 'yum')
mock_get_installed.assert_called_once_with(
PKG_INSTALLED,
['foo-package'],
self.module)
self.module.exit_json.assert_called_with(
changed=False,
outdated_pkgs=[
{
'name': 'foo-package.x86_64',
'current_version': '6.1.5',
'current_release': '1',
'new_version': '8.0.0',
'new_release': '1'
}
])
self.module.reset_mock()
@mock.patch(
'validations_common.library.check_package_update._get_new_pkg_info',
return_value={
'foo-package.x86_64': cppkg.PackageDetails(
'foo-package.x86_64',
'8.0.0',
'1',
'foo-stable')
}
)
@mock.patch(
'validations_common.library.check_package_update._get_installed_pkgs')
@mock.patch('validations_common.library.check_package_update._command')
def test_returns_current_version_if_no_updates(self, mock_command,
mock_get_installed, mock_get_new_pkg_info):
mock_command.side_effect = [
[PKG_INSTALLED, ''],
['', 'No packages found'],
['', 'Error: No matching Packages to list\n'],
]
check_update(self.module, 'foo-package', 'yum')
self.module.exit_json.assert_called_with(changed=False,
name='foo-package',
current_version='6.1.5',
current_release='1',
new_version=None,
new_release=None)
mock_get_installed.side_effect = [{'foo-package.x86_64': self.package_details}]
cppkg.check_update(self.module, ['foo-package'], 'yum')
mock_get_installed.assert_called_once_with(
PKG_INSTALLED,
['foo-package'],
self.module)
self.module.exit_json.assert_called_with(
changed=False,
outdated_pkgs=[
{
'name': 'foo-package.x86_64',
'current_version': '6.1.5',
'current_release': '1',
'new_version': None,
'new_release': None
}
])
self.module.reset_mock()
@mock.patch(
'validations_common.library.check_package_update.subprocess.PIPE')
@mock.patch(
'validations_common.library.check_package_update.subprocess.Popen')
def test_command_rpm_no_process(self, mock_popen, mock_pipe):
cli_command = [
'rpm',
'-qa',
'--qf',
'%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}\n'
]
command_output = cppkg._command(cli_command)
mock_popen.assert_called_once_with(
cli_command,
stdout=mock_pipe,
stderr=mock_pipe,
universal_newlines=True)
def test_get_new_pkg_info(self):
pkg_info = cppkg._get_new_pkg_info(PKG_AVAILABLE)
self.assertIsInstance(pkg_info, dict)
self.assertTrue('foo-package.x86_64' in pkg_info)
self.assertIsInstance(pkg_info['foo-package.x86_64'], cppkg.PackageDetails)
@mock.patch('validations_common.library.check_package_update._command')
def test_get_pkg_mgr_fail(self, mock_command):
mock_command.side_effect = [
('barSTDOUT', 'fooERROR'),
('', '')
]
pkg_manager = cppkg._get_pkg_manager(self.module)
self.assertEqual(pkg_manager, None)
self.module.fail_json.assert_called_once_with(msg=mock.ANY)
self.module.reset_mock()
@mock.patch('validations_common.library.check_package_update._command')
def test_get_pkg_mgr_succes_dnf(self, mock_command):
mock_command.side_effect = [('fizzSTDOUT', '')]
pkg_manager = cppkg._get_pkg_manager(self.module)
self.assertEqual(pkg_manager, 'dnf')
self.module.fail_json.assert_not_called()
self.module.reset_mock()
@mock.patch('validations_common.library.check_package_update._command')
def test_get_pkg_mgr_succes_yum(self, mock_command):
mock_command.side_effect = [
('barSTDOUT', 'fooERROR'),
('fizzSTDOUT', '')
]
pkg_manager = cppkg._get_pkg_manager(self.module)
self.assertEqual(pkg_manager, 'yum')
self.module.fail_json.assert_not_called()
self.module.reset_mock()
def test_get_installed_pkgs_success(self):
"""Test that _get_installed_pkgs will correctly process
output of rpm, compare it with provided package name list
and return dictionary of PackageDetails.
"""
installed_pkgs = cppkg._get_installed_pkgs(
PKG_INSTALLED + '\n',
['foo-package'],
self.module)
self.assertIsInstance(installed_pkgs, dict)
self.assertIsInstance(installed_pkgs['foo-package.x86_64'], cppkg.PackageDetails)
self.assertEqual(installed_pkgs['foo-package.x86_64'].name, 'foo-package')
self.assertEqual(installed_pkgs['foo-package.x86_64'].arch, 'x86_64')
self.assertEqual(installed_pkgs['foo-package.x86_64'].version, '6.1.5')
self.assertEqual(installed_pkgs['foo-package.x86_64'].release, '1')
self.module.fail_json.assert_not_called()
self.module.reset_mock()
def test_get_installed_pkgs_failure_pkg_missing(self):
cppkg._get_installed_pkgs(
installed_stdout=PKG_INSTALLED + '\n',
packages=['foo-package', 'bar-package'],
module=self.module
)
self.module.fail_json.assert_called_once_with(
msg="Following packages are not installed ['bar-package']"
)
self.module.reset_mock()