Add Aetos datasource

Implement the spec for multi-tenancy support for metrics. This adds
a new 'Aetos' datasource very similar to the current Prometheus
datasource. Because of that, the original PrometheusHelper class
was split into two classes and the base class is used for
PrometheusHelper and for AetosHelper. Except for the split, there
is one more change to the original PrometheusHelper class code, which
is the addition and use of the _get_fqdn_label() and
_get_instance_uuid_label() methods.

As part of the change, I refactored the current prometheus datasource
unit tests. Most of them are now used to test the PrometheusBase class
with minimal changes. Changes I've made to the original tests:

- the ones that can be be used to test the base class are moved into the
  TestPrometheusBase class
- the _setup_prometheus_client, _get_instance_uuid_label and
  _get_fqdn_label functions are mocked in the base class tests.
  Their concrete implementations are tested in each datasource tests
  separately.
- a self._create_helper() is used to instantiate the helper class with
  correct mocking.
- all config value modification is the original tests got moved out and
  instead of modifying the config values, the _get_* methods are mocked
  to return the wanted values
- to keep similar test coverage, config retrieval is tested for each
  concrete class by testing the _get_* methods.

New watcher-aetos-integration and watcher-aetos-integration-realdata
zuul jobs are added to test the new datasource. These use the same set
of tempest tests as the current watcher-prometheus-integration jobs.
The only difference is the environment setup and the Watcher config,
so that the job deploys Aetos and Watcher uses it instead of accessing
Prometheus directly.

At first this was generated by asking cursor to implement the linked spec
with some additional prompts for some smaller changes. Afterwards I manually
went through the code doing some cleanups, ensuring it complies with
PEP8 and hacking and so on. Later on I manually adjusted the code to use
the latest observabilityclient changes.
The zuul job was also mostly generated by cursor.

Implements: https://blueprints.launchpad.net/watcher/+spec/prometheus-multitenancy-support

Generated-By: Cursor with claude-4-sonnet model
Change-Id: I72c2171f72819bbde6c9cbbf565ee895e5d2bd53
Signed-off-by: Jaromir Wysoglad <jwysogla@redhat.com>
This commit is contained in:
Jaromir Wysoglad
2025-07-22 03:16:34 -04:00
parent 355671e979
commit 8309d9848a
17 changed files with 1736 additions and 1041 deletions

View File

@@ -264,10 +264,41 @@
ceilometer-acompute: false
node_exporter: false
- job:
name: watcher-aetos-integration
parent: watcher-sg-core-tempest-base
description: |
This job tests Watcher with Aetos reverse-proxy for Prometheus
using Keystone authentication instead of direct Prometheus access.
required-projects:
- openstack/python-observabilityclient
- openstack/aetos
vars: &aetos_vars
devstack_plugins:
ceilometer: https://opendev.org/openstack/ceilometer
sg-core: https://github.com/openstack-k8s-operators/sg-core
watcher: https://opendev.org/openstack/watcher
devstack-plugin-prometheus: https://opendev.org/openstack/devstack-plugin-prometheus
aetos: https://opendev.org/openstack/aetos
devstack_local_conf:
post-config:
$WATCHER_CONF:
watcher_datasources:
datasources: aetos
aetos_client:
interface: public
region_name: RegionOne
fqdn_label: fqdn
instance_uuid_label: resource
test-config:
$TEMPEST_CONFIG:
optimize:
datasource: prometheus
- job:
name: watcher-prometheus-integration-realdata
parent: watcher-sg-core-tempest-base
vars:
vars: &realdata_vars
devstack_services:
ceilometer-acompute: true
node_exporter: true
@@ -282,7 +313,7 @@
# All tests inside watcher_tempest_plugin.tests.scenario with tag "real_load"
tempest_test_regex: (^watcher_tempest_plugin.tests.scenario)(.*\[.*\breal_load\b.*\].*)
tempest_exclude_regex: ""
group-vars:
group-vars: &realdata_group_vars
subnode:
devstack_services:
ceilometer-acompute: true
@@ -305,6 +336,12 @@
vars:
tox_envlist: py3-threading
- job:
name: watcher-aetos-integration-realdata
parent: watcher-aetos-integration
vars: *realdata_vars
group-vars: *realdata_group_vars
- project:
queue: watcher
templates:
@@ -326,6 +363,7 @@
- watcher-tempest-functional-ipv6-only
- watcher-prometheus-integration
- watcher-prometheus-integration-threading
- watcher-aetos-integration
gate:
jobs:
- watcher-tempest-functional
@@ -333,6 +371,8 @@
experimental:
jobs:
- watcher-prometheus-integration-realdata
- watcher-aetos-integration-realdata
periodic-weekly:
jobs:
- watcher-prometheus-integration-realdata
- watcher-aetos-integration-realdata

View File

@@ -0,0 +1,157 @@
================
Aetos datasource
================
Synopsis
--------
The Aetos datasource allows Watcher to use an Aetos reverse proxy server as the
source for collected metrics used by the Watcher decision engine. Aetos is a
multi-tenant aware reverse proxy that sits in front of a Prometheus server and
provides Keystone authentication and role-based access control. The Aetos
datasource uses Keystone service discovery to locate the Aetos endpoint and
requires authentication via Keystone tokens.
Requirements
-------------
The Aetos datasource has the following requirements:
* An Aetos reverse proxy server deployed in front of Prometheus
* Aetos service registered in Keystone with service type 'metric-storage'
* Valid Keystone credentials for Watcher with admin or service role
* Prometheus metrics with appropriate labels (same as direct Prometheus access)
Like the Prometheus datasource, it is required that Prometheus metrics contain
a label to identify the hostname of the exporter from which the metric was
collected. This is used to match against the Watcher cluster model
``ComputeNode.hostname``. The default for this label is ``fqdn`` and in the
prometheus scrape configs would look like:
.. code-block::
scrape_configs:
- job_name: node
static_configs:
- targets: ['10.1.2.3:9100']
labels:
fqdn: "testbox.controlplane.domain"
This default can be overridden when a deployer uses a different label to
identify the exporter host (for example ``hostname`` or ``host``, or any other
label, as long as it identifies the host).
Internally this label is used in creating ``fqdn_instance_labels``, containing
the list of values assigned to the label in the Prometheus targets.
The elements of the resulting fqdn_instance_labels are expected to match the
``ComputeNode.hostname`` used in the Watcher decision engine cluster model.
An example ``fqdn_instance_labels`` is the following:
.. code-block::
[
'ena.controlplane.domain',
'dio.controlplane.domain',
'tria.controlplane.domain',
]
For instance metrics, it is required that Prometheus contains a label
with the uuid of the OpenStack instance in each relevant metric. By default,
the datasource will look for the label ``resource``. The
``instance_uuid_label`` config option in watcher.conf allows deployers to
override this default to any other label name that stores the ``uuid``.
Limitations
-----------
The Aetos datasource shares the same limitations as the Prometheus datasource:
The current implementation doesn't support the ``statistic_series`` function of
the Watcher ``class DataSourceBase``. It is expected that the
``statistic_aggregation`` function (which is implemented) is sufficient in
providing the **current** state of the managed resources in the cluster.
The ``statistic_aggregation`` function defaults to querying back 300 seconds,
starting from the present time (the time period is a function parameter and
can be set to a value as required). Implementing the ``statistic_series`` can
always be re-visited if the requisite interest and work cycles are volunteered
by the interested parties.
One further note about a limitation in the implemented
``statistic_aggregation`` function. This function is defined with a
``granularity`` parameter, to be used when querying whichever of the Watcher
``DataSourceBase`` metrics providers. In the case of Aetos (like Prometheus),
we do not fetch and then process individual metrics across the specified time
period. Instead we use the PromQL querying operators and functions, so that the
server itself will process the request across the specified parameters and
then return the result. So ``granularity`` parameter is redundant and remains
unused for the Aetos implementation of ``statistic_aggregation``. The
granularity of the data fetched by Prometheus server is specified in
configuration as the server ``scrape_interval`` (current default 15 seconds).
Additionally, there is a slight performance impact compared to direct
Prometheus access. Since Aetos acts as a reverse proxy in front of Prometheus,
there is an additional step for each request, resulting in slightly longer
delays.
Configuration
-------------
A deployer must set the ``datasources`` parameter to include ``aetos``
under the watcher_datasources section of watcher.conf (or add ``aetos`` in
datasources for a specific strategy if preferred eg. under the
``[watcher_strategies.workload_stabilization]`` section).
.. note::
Having both Prometheus and Aetos datasources configured at the same time
is not supported and will result in a configuration error. Allowing this
can be investigated in the future if a need or a proper use case is
identified.
The watcher.conf configuration file is also used to set the parameter values
required by the Watcher Aetos data source. The configuration can be
added under the ``[aetos_client]`` section and the available options are
duplicated below from the code as they are self documenting:
.. code-block::
cfg.StrOpt('interface',
default='public',
choices=['internal', 'public', 'admin'],
help="Type of endpoint to use in keystoneclient."),
cfg.StrOpt('region_name',
help="Region in Identity service catalog to use for "
"communication with the OpenStack service."),
cfg.StrOpt('fqdn_label',
default='fqdn',
help="The label that Prometheus uses to store the fqdn of "
"exporters. Defaults to 'fqdn'."),
cfg.StrOpt('instance_uuid_label',
default='resource',
help="The label that Prometheus uses to store the uuid of "
"OpenStack instances. Defaults to 'resource'."),
Authentication and Service Discovery
------------------------------------
Unlike the Prometheus datasource which requires explicit host and port
configuration, the Aetos datasource uses Keystone service discovery to
automatically locate the Aetos endpoint. The datasource:
1. Uses the configured Keystone credentials to authenticate
2. Searches the service catalog for a service with type 'metric-storage'
3. Uses the discovered endpoint URL to connect to Aetos
4. Attaches a Keystone token to each request for authentication
If the Aetos service is not registered in Keystone, the datasource will
fail to initialize and prevent the decision engine from starting.
So a sample watcher.conf configured to use the Aetos datasource would look
like the following:
.. code-block::
[watcher_datasources]
datasources = aetos
[aetos_client]
interface = public
region_name = RegionOne
fqdn_label = fqdn

View File

@@ -30,7 +30,7 @@ identify the exporter host (for example ``hostname`` or ``host``, or any other
label, as long as it identifies the host).
Internally this label is used in creating ``fqdn_instance_labels``, containing
the list of values assigned to the the label in the Prometheus targets.
the list of values assigned to the label in the Prometheus targets.
The elements of the resulting fqdn_instance_labels are expected to match the
``ComputeNode.hostname`` used in the Watcher decision engine cluster model.
An example ``fqdn_instance_labels`` is the following:
@@ -47,7 +47,7 @@ For instance metrics, it is required that Prometheus contains a label
with the uuid of the OpenStack instance in each relevant metric. By default,
the datasource will look for the label ``resource``. The
``instance_uuid_label`` config option in watcher.conf allows deployers to
override this default to any other label name that stores the ``uuid``.
override this default to any other label name that stores the ``uuid``.
Limitations
-----------

View File

@@ -0,0 +1,13 @@
---
features:
- |
A new Aetos data source is added. This allows the watcher decision
engine to collect metrics through an Aetos reverse proxy server which
provides multi-tenant aware access to Prometheus with Keystone
authentication and role-based access control. The Aetos datasource
uses Keystone service discovery to automatically locate the Aetos
endpoint and provides enhanced security compared to direct Prometheus
access. For more information about the Aetos data source, including
configuration options see
https://docs.openstack.org/watcher/latest/datasources/aetos.html

View File

@@ -37,7 +37,7 @@ python-keystoneclient>=3.15.0 # Apache-2.0
python-monascaclient>=1.12.0 # Apache-2.0
python-neutronclient>=6.7.0 # Apache-2.0
python-novaclient>=14.1.0 # Apache-2.0
python-observabilityclient>=0.3.0 # Apache-2.0
python-observabilityclient>=1.1.0 # Apache-2.0
python-openstackclient>=3.14.0 # Apache-2.0
python-ironicclient>=2.5.0 # Apache-2.0
SQLAlchemy>=1.2.5 # MIT

View File

@@ -512,3 +512,8 @@ class NotificationPayloadError(WatcherException):
class InvalidPoolAttributeValue(Invalid):
msg_fmt = _("The %(name)s pool %(attribute)s is not integer")
class DataSourceConfigConflict(UnsupportedError):
msg_fmt = _("Datasource %(datasource_one)s is not supported "
"when datasource %(datasource_two)s is also enabled.")

View File

@@ -19,6 +19,7 @@
from oslo_config import cfg
from watcher.conf import aetos_client
from watcher.conf import api
from watcher.conf import applier
from watcher.conf import cinder_client
@@ -47,6 +48,7 @@ from watcher.conf import service
CONF = cfg.CONF
service.register_opts(CONF)
aetos_client.register_opts(CONF)
api.register_opts(CONF)
paths.register_opts(CONF)
exception.register_opts(CONF)

View File

@@ -0,0 +1,49 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_config import cfg
aetos_client = cfg.OptGroup(name='aetos_client',
title='Configuration Options for Aetos',
help="See https://docs.openstack.org/watcher/"
"latest/datasources/aetos.html for "
"details on how these options are used.")
AETOS_CLIENT_OPTS = [
cfg.StrOpt('interface',
default='public',
choices=['internal', 'public', 'admin'],
help="Type of endpoint to use in keystoneclient."),
cfg.StrOpt('region_name',
help="Region in Identity service catalog to use for "
"communication with the OpenStack service."),
cfg.StrOpt('fqdn_label',
default='fqdn',
help="The label that Prometheus uses to store the fqdn of "
"exporters. Defaults to 'fqdn'."),
cfg.StrOpt('instance_uuid_label',
default='resource',
help="The label that Prometheus uses to store the uuid of "
"OpenStack instances. Defaults to 'resource'."),
]
def register_opts(conf):
conf.register_group(aetos_client)
conf.register_opts(AETOS_CLIENT_OPTS, group=aetos_client)
def list_opts():
return [(aetos_client, AETOS_CLIENT_OPTS)]

View File

@@ -18,6 +18,7 @@
from oslo_config import cfg
from watcher.decision_engine.datasources import aetos
from watcher.decision_engine.datasources import manager
datasources = cfg.OptGroup(name='watcher_datasources',
@@ -26,6 +27,12 @@ datasources = cfg.OptGroup(name='watcher_datasources',
possible_datasources = list(manager.DataSourceManager.metric_map.keys())
# NOTE(jwysogla): Having the Aetos and Prometheus datasources specified at the
# same time raises a DataSourceConfigConflict exception. So remove the Aetos
# datasource from the list to have a valid default configuration.
default_datasources = list(possible_datasources)
default_datasources.remove(aetos.AetosHelper.NAME)
DATASOURCES_OPTS = [
cfg.ListOpt("datasources",
help="Datasources to use in order to query the needed metrics."
@@ -34,7 +41,7 @@ DATASOURCES_OPTS = [
" the default for all strategies unless a strategy has a"
" specific override.",
item_type=cfg.types.String(choices=possible_datasources),
default=possible_datasources),
default=default_datasources),
cfg.IntOpt('query_max_retries',
min=1,
default=10,

View File

@@ -0,0 +1,67 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from observabilityclient.utils import metric_utils as obs_client_utils
from oslo_config import cfg
from oslo_log import log
from watcher.common import clients
from watcher.decision_engine.datasources import prometheus_base
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class AetosHelper(prometheus_base.PrometheusBase):
"""AetosHelper class for retrieving metrics from Aetos
This class implements the PrometheusBase to allow Watcher to query
Aetos as a data source for metrics.
"""
NAME = 'aetos'
def __init__(self, osc=None):
"""Initialize AetosHelper with optional OpenStackClients instance
:param osc: OpenStackClients instance for Keystone authentication
"""
self.osc = osc if osc else clients.OpenStackClients()
super(AetosHelper, self).__init__()
def _get_fqdn_label(self):
"""Get the FQDN label from aetos_client config"""
return CONF.aetos_client.fqdn_label
def _get_instance_uuid_label(self):
"""Get the instance UUID label from aetos_client config"""
return CONF.aetos_client.instance_uuid_label
def _setup_prometheus_client(self):
"""Initialize the prometheus client for Aetos with Keystone auth
:return: PrometheusAPIClient instance configured for Aetos
"""
# Get Keystone session from OpenStackClients
session = self.osc.session
opts = {'interface': CONF.aetos_client.interface,
'region_name': CONF.aetos_client.region_name,
'service_type': 'metric-storage'}
the_client = obs_client_utils.get_prom_client_from_keystone(
session, adapter_options=opts
)
return the_client

View File

@@ -21,6 +21,7 @@ from oslo_config import cfg
from oslo_log import log
from watcher.common import exception
from watcher.decision_engine.datasources import aetos
from watcher.decision_engine.datasources import gnocchi as gnoc
from watcher.decision_engine.datasources import grafana as graf
from watcher.decision_engine.datasources import monasca as mon
@@ -36,6 +37,7 @@ class DataSourceManager(object):
(mon.MonascaHelper.NAME, mon.MonascaHelper.METRIC_MAP),
(graf.GrafanaHelper.NAME, graf.GrafanaHelper.METRIC_MAP),
(prom.PrometheusHelper.NAME, prom.PrometheusHelper.METRIC_MAP),
(aetos.AetosHelper.NAME, aetos.AetosHelper.METRIC_MAP),
])
"""Dictionary with all possible datasources, dictionary order is
the default order for attempting to use datasources
@@ -48,6 +50,7 @@ class DataSourceManager(object):
self._gnocchi = None
self._grafana = None
self._prometheus = None
self._aetos = None
# Dynamically update grafana metric map, only available at runtime
# The metric map can still be overridden by a yaml config file
@@ -67,6 +70,24 @@ class DataSourceManager(object):
LOG.warning('The monasca datasource is deprecated and will be '
'removed in a future release.')
self._validate_datasource_config()
def _validate_datasource_config(self):
"""Validate datasource configuration
Checks for configuration conflicts, such as having both prometheus
and aetos datasources configured simultaneously.
"""
if (self.datasources and
prom.PrometheusHelper.NAME in self.datasources and
aetos.AetosHelper.NAME in self.datasources):
LOG.error("Configuration error: Cannot use both prometheus "
"and aetos datasources simultaneously.")
raise exception.DataSourceConfigConflict(
datasource_one=prom.PrometheusHelper.NAME,
datasource_two=aetos.AetosHelper.NAME
)
@property
def monasca(self):
if self._monasca is None:
@@ -107,6 +128,16 @@ class DataSourceManager(object):
def prometheus(self, prometheus):
self._prometheus = prometheus
@property
def aetos(self):
if self._aetos is None:
self._aetos = aetos.AetosHelper(osc=self.osc)
return self._aetos
@aetos.setter
def aetos(self, aetos):
self._aetos = aetos
def get_backend(self, metrics):
"""Determine the datasource to use from the configuration

View File

@@ -12,63 +12,36 @@
# License for the specific language governing permissions and limitations
# under the License.
#
from observabilityclient import prometheus_client
from oslo_config import cfg
from oslo_log import log
import re
from watcher._i18n import _
from watcher.common import exception
from watcher.decision_engine.datasources import base
from watcher.decision_engine.datasources import prometheus_base
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class PrometheusHelper(base.DataSourceBase):
class PrometheusHelper(prometheus_base.PrometheusBase):
"""PrometheusHelper class for retrieving metrics from Prometheus server
This class implements the DataSourceBase to allow Watcher to query
This class implements the PrometheusBase to allow Watcher to query
Prometheus as a data source for metrics.
"""
NAME = 'prometheus'
METRIC_MAP = dict(host_cpu_usage='node_cpu_seconds_total',
host_ram_usage='node_memory_MemAvailable_bytes',
host_outlet_temp=None,
host_inlet_temp=None,
host_airflow=None,
host_power=None,
instance_cpu_usage='ceilometer_cpu',
instance_ram_usage='ceilometer_memory_usage',
instance_ram_allocated='instance.memory',
instance_l3_cache_usage=None,
instance_root_disk_size='instance.disk',
)
AGGREGATES_MAP = dict(mean='avg', max='max', min='min', count='avg')
def __init__(self):
"""Initialise the PrometheusHelper
def _get_fqdn_label(self):
"""Get the FQDN label from prometheus_client config"""
return CONF.prometheus_client.fqdn_label
The prometheus helper uses the PrometheusAPIClient provided by
python-observabilityclient.
The prometheus_fqdn_labels contains a list the values contained in
the fqdn_label in the Prometheus instance. When making queries to
Prometheus we use the fqdn_label to specify the node for which
metrics are to be retrieved.
host, port and fqdn_label come from watcher_client
config. The prometheus_fqdn_label allows override of the required label
in Prometheus scrape configs that specifies each target's fqdn.
"""
self.prometheus = self._setup_prometheus_client()
self.prometheus_fqdn_label = (
CONF.prometheus_client.fqdn_label
)
self.prometheus_fqdn_labels = (
self._build_prometheus_fqdn_labels()
)
self.prometheus_host_instance_map = (
self._build_prometheus_host_instance_map()
)
def _get_instance_uuid_label(self):
"""Get the instance UUID label from prometheus_client config"""
return CONF.prometheus_client.instance_uuid_label
def _setup_prometheus_client(self):
"""Initialise the prometheus client with config options
@@ -135,385 +108,3 @@ class PrometheusHelper(base.DataSourceBase):
the_client.set_basic_auth(prometheus_user, prometheus_pass)
return the_client
def _build_prometheus_fqdn_labels(self):
"""Build the list of fqdn_label values to be used in host queries
Watcher knows nodes by their hostname. In Prometheus however the
scrape targets (also known as 'instances') are specified by I.P.
(or hostname) and port number and fqdn is stored in a custom 'fqdn'
label added to Prometheus scrape_configs. Operators can use a
different custom label instead by setting the prometheus_fqdn_label
config option under the prometheus_client section of watcher config.
The built prometheus_fqdn_labels is created with the full list
of values of the prometheus_fqdn_label label in Prometheus. This will
be used to create a map of hostname<-->fqdn and to identify if a target
exist in prometheus for the compute nodes before sending the query.
:return a set of values of the fqdn label. For example:
{'foo.example.com', 'bar.example.com'}
{'foo', 'bar'}
"""
prometheus_targets = self.prometheus._get(
"targets?state=active")['data']['activeTargets']
# >>> prometheus_targets[0]['labels']
# {'fqdn': 'marios-env-again.controlplane.domain',
# 'instance': 'localhost:9100', 'job': 'node'}
fqdn_instance_labels = set()
for target in prometheus_targets:
if target.get('labels', {}).get(self.prometheus_fqdn_label):
fqdn_instance_labels.add(
target['labels'].get(self.prometheus_fqdn_label))
if not fqdn_instance_labels:
LOG.error(
"Could not create fqdn labels list from Prometheus "
"targets config. Prometheus returned the following: %s",
prometheus_targets
)
return set()
return fqdn_instance_labels
def _build_prometheus_host_instance_map(self):
"""Build the hostname<-->instance_label mapping needed for queries
The prometheus_fqdn_labels has the fully qualified domain name
for hosts. This will create a duplicate map containing only the host
name part. Depending on the watcher node.hostname either the
fqdn_instance_labels or the host_instance_map will be used to resolve
the correct prometheus fqdn_label for queries. In the event the
fqdn_instance_labels elements are not valid fqdn (for example it has
hostnames, not fqdn) the host_instance_map cannot be created and
an empty dictionary is returned with a warning logged.
:return a dict mapping hostname to instance label. For example:
{'foo': 'foo.example.com', 'bar': 'bar.example.com'}
"""
if not self.prometheus_fqdn_labels:
LOG.error("Cannot build host_instance_map without "
"fqdn_instance_labels")
return {}
host_instance_map = {
host: fqdn for (host, fqdn) in (
(fqdn.split('.')[0], fqdn)
for fqdn in self.prometheus_fqdn_labels
if '.' in fqdn
)
}
if not host_instance_map:
LOG.warning("Creating empty host instance map. Are the keys "
"in prometheus_fqdn_labels valid fqdn?")
return {}
return host_instance_map
def _resolve_prometheus_instance_label(self, node_name):
"""Resolve the prometheus instance label to use in queries
Given the watcher node.hostname, resolve the prometheus instance
label for use in queries, first trying the fqdn_instance_labels and
then the host_instance_map (watcher.node_name can be fqdn or hostname).
If the name is not resolved after the first attempt, rebuild the fqdn
and host instance maps and try again. This allows for new hosts added
after the initialisation of the fqdn_instance_labels.
:param node_name: the watcher node.hostname
:return String for the prometheus instance label and None if not found
"""
def _query_maps(node):
if node in self.prometheus_fqdn_labels:
return node
else:
return self.prometheus_host_instance_map.get(node, None)
instance_label = _query_maps(node_name)
# refresh the fqdn and host instance maps and retry
if not instance_label:
self.prometheus_fqdn_labels = (
self._build_prometheus_fqdn_labels()
)
self.prometheus_host_instance_map = (
self._build_prometheus_host_instance_map()
)
instance_label = _query_maps(node_name)
if not instance_label:
LOG.error("Cannot query prometheus without instance label. "
"Could not resolve %s", node_name)
return None
return instance_label
def _resolve_prometheus_aggregate(self, watcher_aggregate, meter):
"""Resolve the prometheus aggregate using self.AGGREGATES_MAP
This uses the AGGREGATES_MAP to resolve the correct prometheus
aggregate to use in queries, from the given watcher aggregate
"""
if watcher_aggregate == 'count':
LOG.warning('Prometheus data source does not currently support '
' the count aggregate. Proceeding with mean (avg).')
promql_aggregate = self.AGGREGATES_MAP.get(watcher_aggregate)
if not promql_aggregate:
raise exception.InvalidParameter(
message=(_("Unknown Watcher aggregate %s. This does not "
"resolve to any valid prometheus query aggregate.")
% watcher_aggregate)
)
return promql_aggregate
def _build_prometheus_query(self, aggregate, meter, instance_label,
period, resource=None):
"""Build and return the prometheus query string with the given args
This function builds and returns the string query that will be sent
to the Prometheus server /query endpoint. For host cpu usage we use:
100 - (avg by (fqdn)(rate(node_cpu_seconds_total{mode='idle',
fqdn='some_host'}[300s])) * 100)
so using prometheus rate function over the specified period, we average
per instance (all cpus) idle time and then 'everything else' is cpu
usage time.
For host memory usage we use:
(node_memory_MemTotal_bytes{instance='the_host'} -
avg_over_time(
node_memory_MemAvailable_bytes{instance='the_host'}[300s]))
/ 1024
So we take total and subtract available memory to determine
how much is in use. We use the prometheus xxx_over_time functions
avg/max/min depending on the aggregate with the specified time period.
:param aggregate: one of the values of self.AGGREGATES_MAP
:param meter: the name of the Prometheus meter to use
:param instance_label: the Prometheus instance label (scrape target).
:param period: the period in seconds for which to query
:param resource: the resource object for which metrics are requested
:return: a String containing the Prometheus query
:raises watcher.common.exception.InvalidParameter if params are None
:raises watcher.common.exception.InvalidParameter if meter is not
known or currently supported (prometheus meter name).
"""
query_args = None
uuid_label_key = CONF.prometheus_client.instance_uuid_label
if (meter is None or aggregate is None or instance_label is None or
period is None):
raise exception.InvalidParameter(
message=(_(
"Cannot build prometheus query without args. "
"You provided: meter %(mtr)s, aggregate %(agg)s, "
"instance_label %(inst)s, period %(prd)s")
% {'mtr': meter, 'agg': aggregate,
'inst': instance_label, 'prd': period})
)
if meter == 'node_cpu_seconds_total':
query_args = (
"100 - (%(agg)s by (%(label)s)(rate(%(meter)s"
"{mode='idle',%(label)s='%(label_value)s'}[%(period)ss])) "
"* 100)"
% {'label': self.prometheus_fqdn_label,
'label_value': instance_label, 'agg': aggregate,
'meter': meter, 'period': period}
)
elif meter == 'node_memory_MemAvailable_bytes':
# Prometheus metric is in B and we need to return KB
query_args = (
"(node_memory_MemTotal_bytes{%(label)s='%(label_value)s'} "
"- %(agg)s_over_time(%(meter)s{%(label)s='%(label_value)s'}"
"[%(period)ss])) / 1024"
% {'label': self.prometheus_fqdn_label,
'label_value': instance_label, 'agg': aggregate,
'meter': meter, 'period': period}
)
elif meter == 'ceilometer_memory_usage':
query_args = (
"%s_over_time(%s{%s='%s'}[%ss])" %
(aggregate, meter, uuid_label_key, instance_label, period)
)
elif meter == 'ceilometer_cpu':
# We are converting the total cumulative cpu time (ns) to cpu usage
# percentage so we need to divide between the number of vcpus.
# As this is a percentage metric, we set a max level of 100. It has
# been observed in very high usage cases, prometheus reporting
# values higher that 100 what can lead to unexpected behaviors.
vcpus = resource.vcpus
if not vcpus:
LOG.warning(
"instance vcpu count not set for instance %s, assuming 1",
instance_label
)
vcpus = 1
query_args = (
"clamp_max((%(agg)s by (%(label)s)"
"(rate(%(meter)s{%(label)s='%(label_value)s'}[%(period)ss]))"
"/10e+8) *(100/%(vcpus)s), 100)"
% {'label': uuid_label_key, 'label_value': instance_label,
'agg': aggregate, 'meter': meter, 'period': period,
'vcpus': vcpus}
)
else:
raise exception.InvalidParameter(
message=(_("Cannot process prometheus meter %s") % meter)
)
return query_args
def check_availability(self):
"""check if Prometheus server is available for queries
Performs HTTP get on the prometheus API /status/runtimeinfo endpoint.
The prometheus_client will raise a PrometheuAPIClientError if the
call is unsuccessful, which is caught here and a warning logged.
"""
try:
self.prometheus._get("status/runtimeinfo")
except prometheus_client.PrometheusAPIClientError:
LOG.warning(
"check_availability raised PrometheusAPIClientError. "
"Is Prometheus server down?"
)
return 'not available'
return 'available'
def list_metrics(self):
"""Fetch all prometheus metrics from api/v1/label/__name__/values
The prometheus_client will raise a PrometheuAPIClientError if the
call is unsuccessful, which is caught here and a warning logged.
"""
try:
response = self.prometheus._get("label/__name__/values")
except prometheus_client.PrometheusAPIClientError:
LOG.warning(
"list_metrics raised PrometheusAPIClientError. Is Prometheus"
"server down?"
)
return set()
return set(response['data'])
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
meter = self._get_meter(meter_name)
query_args = ''
instance_label = ''
# For instance resource type, the datasource expects the uuid of the
# instance to be assigned to a label in the prometheus metrics, with a
# specific key value.
if resource_type == 'compute_node':
instance_label = self._resolve_prometheus_instance_label(
resource.hostname)
elif resource_type == 'instance':
instance_label = resource.uuid
# For ram_allocated and root_disk size metrics there are no valid
# values in the prometheus backend store. We rely in the values
# provided in the vms inventory.
if meter == 'instance.memory':
return float(resource.memory)
elif meter == 'instance.disk':
return float(resource.disk)
else:
LOG.warning(
"Prometheus data source does not currently support "
"resource_type %s", resource_type
)
return None
promql_aggregate = self._resolve_prometheus_aggregate(aggregate, meter)
query_args = self._build_prometheus_query(
promql_aggregate, meter, instance_label, period, resource
)
if not query_args:
LOG.error("Cannot proceed without valid prometheus query")
return None
result = self.query_retry(
self.prometheus.query, query_args,
ignored_exc=prometheus_client.PrometheusAPIClientError,
)
return float(result[0].value) if result else None
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
raise NotImplementedError(
_('Prometheus helper currently does not support statistic_series. '
'This can be considered for future enhancement.'))
def _invert_max_min_aggregate(self, agg):
"""Invert max and min for node/host metric queries from node-exporter
because we query for 'idle'/'unused' cpu and memory.
For Watcher 'max cpu used' we query for prometheus 'min idle time'.
For Watcher 'max memory used' we retrieve min 'unused'/'available'
memory from Prometheus. This internal function is used exclusively
by get_host_cpu_usage and get_host_ram_usage.
:param agg: the metric collection aggregate
:return: a String aggregate
"""
if agg == 'max':
return 'min'
elif agg == 'min':
return 'max'
return agg
def get_host_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
"""Query prometheus for node_cpu_seconds_total
This calculates the host cpu usage and returns it as a percentage
The calculation is made by using the cpu 'idle' time, per
instance (so all CPUs are included). For example the query looks like
(100 - (avg by (fqdn)(rate(node_cpu_seconds_total
{mode='idle',fqdn='compute1.example.com'}[300s])) * 100))
"""
aggregate = self._invert_max_min_aggregate(aggregate)
cpu_usage = self.statistic_aggregation(
resource, 'compute_node',
'host_cpu_usage', period=period,
granularity=granularity, aggregate=aggregate)
return float(cpu_usage) if cpu_usage else None
def get_host_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
aggregate = self._invert_max_min_aggregate(aggregate)
ram_usage = self.statistic_aggregation(
resource, 'compute_node',
'host_ram_usage', period=period,
granularity=granularity, aggregate=aggregate)
return float(ram_usage) if ram_usage else None
def get_instance_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
ram_usage = self.statistic_aggregation(
resource, 'instance',
'instance_ram_usage', period=period,
granularity=granularity, aggregate=aggregate)
return ram_usage
def get_instance_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
cpu_usage = self.statistic_aggregation(
resource, 'instance',
'instance_cpu_usage', period=period,
granularity=granularity, aggregate=aggregate)
return cpu_usage
def get_instance_ram_allocated(self, resource, period=300,
aggregate="mean", granularity=None):
ram_allocated = self.statistic_aggregation(
resource, 'instance',
'instance_ram_allocated', period=period,
granularity=granularity, aggregate=aggregate)
return ram_allocated
def get_instance_root_disk_size(self, resource, period=300,
aggregate="mean", granularity=None):
root_disk_size = self.statistic_aggregation(
resource, 'instance',
'instance_root_disk_size', period=period,
granularity=granularity, aggregate=aggregate)
return root_disk_size

View File

@@ -0,0 +1,480 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
from observabilityclient import prometheus_client
from oslo_config import cfg
from oslo_log import log
from watcher._i18n import _
from watcher.common import exception
from watcher.decision_engine.datasources import base
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class PrometheusBase(base.DataSourceBase):
"""Base class for Prometheus-based datasources
This class contains shared functionality for querying Prometheus-like
metrics sources. Subclasses should implement _setup_prometheus_client
to provide the appropriate client configuration.
"""
METRIC_MAP = dict(host_cpu_usage='node_cpu_seconds_total',
host_ram_usage='node_memory_MemAvailable_bytes',
host_outlet_temp=None,
host_inlet_temp=None,
host_airflow=None,
host_power=None,
instance_cpu_usage='ceilometer_cpu',
instance_ram_usage='ceilometer_memory_usage',
instance_ram_allocated='instance.memory',
instance_l3_cache_usage=None,
instance_root_disk_size='instance.disk',
)
AGGREGATES_MAP = dict(mean='avg', max='max', min='min', count='avg')
def __init__(self):
"""Initialise the PrometheusBase
The prometheus based datasource classes use the PrometheusAPIClient
provided by python-observabilityclient.
The prometheus_fqdn_labels contains a list the values contained in
the fqdn_label in the Prometheus instance. When making queries to
Prometheus we use the fqdn_label to specify the node for which
metrics are to be retrieved.
The fqdn_label comes from watcher_client
config. The prometheus_fqdn_label allows override of the required label
in Prometheus scrape configs that specifies each target's fqdn.
"""
self.prometheus = self._setup_prometheus_client()
self.prometheus_fqdn_label = self._get_fqdn_label()
self.prometheus_fqdn_labels = (
self._build_prometheus_fqdn_labels()
)
self.prometheus_host_instance_map = (
self._build_prometheus_host_instance_map()
)
@abc.abstractmethod
def _setup_prometheus_client(self):
"""Initialize the prometheus client with appropriate configuration
Subclasses must implement this method to provide their specific
client configuration (direct connection, keystone auth, etc.)
:return: PrometheusAPIClient instance
"""
raise NotImplementedError()
@abc.abstractmethod
def _get_fqdn_label(self):
"""Get the FQDN label configuration
:return: String containing the FQDN label name
"""
raise NotImplementedError()
@abc.abstractmethod
def _get_instance_uuid_label(self):
"""Get the instance UUID label configuration
:return: String containing the instance UUID label name
"""
raise NotImplementedError()
def _build_prometheus_fqdn_labels(self):
"""Build the list of fqdn_label values to be used in host queries
Watcher knows nodes by their hostname. In Prometheus however the
scrape targets (also known as 'instances') are specified by I.P.
(or hostname) and port number and fqdn is stored in a custom 'fqdn'
label added to Prometheus scrape_configs. Operators can use a
different custom label instead by setting the prometheus_fqdn_label
config option under the prometheus_client section of watcher config.
The built prometheus_fqdn_labels is created with the full list
of values of the prometheus_fqdn_label label in Prometheus. This will
be used to create a map of hostname<-->fqdn and to identify if a target
exist in prometheus for the compute nodes before sending the query.
:return a set of values of the fqdn label. For example:
{'foo.example.com', 'bar.example.com'}
{'foo', 'bar'}
"""
prometheus_targets = self.prometheus._get(
"targets?state=active")['data']['activeTargets']
# >>> prometheus_targets[0]['labels']
# {'fqdn': 'marios-env-again.controlplane.domain',
# 'instance': 'localhost:9100', 'job': 'node'}
fqdn_instance_labels = set()
for target in prometheus_targets:
if target.get('labels', {}).get(self.prometheus_fqdn_label):
fqdn_instance_labels.add(
target['labels'].get(self.prometheus_fqdn_label))
if not fqdn_instance_labels:
LOG.error(
"Could not create fqdn labels list from Prometheus "
"targets config. Prometheus returned the following: %s",
prometheus_targets
)
return set()
return fqdn_instance_labels
def _build_prometheus_host_instance_map(self):
"""Build the hostname<-->instance_label mapping needed for queries
The prometheus_fqdn_labels has the fully qualified domain name
for hosts. This will create a duplicate map containing only the host
name part. Depending on the watcher node.hostname either the
fqdn_instance_labels or the host_instance_map will be used to resolve
the correct prometheus fqdn_label for queries. In the event the
fqdn_instance_labels elements are not valid fqdn (for example it has
hostnames, not fqdn) the host_instance_map cannot be created and
an empty dictionary is returned with a warning logged.
:return a dict mapping hostname to instance label. For example:
{'foo': 'foo.example.com', 'bar': 'bar.example.com'}
"""
if not self.prometheus_fqdn_labels:
LOG.error("Cannot build host_instance_map without "
"fqdn_instance_labels")
return {}
host_instance_map = {
host: fqdn for (host, fqdn) in (
(fqdn.split('.')[0], fqdn)
for fqdn in self.prometheus_fqdn_labels
if '.' in fqdn
)
}
if not host_instance_map:
LOG.warning("Creating empty host instance map. Are the keys "
"in prometheus_fqdn_labels valid fqdn?")
return {}
return host_instance_map
def _resolve_prometheus_instance_label(self, node_name):
"""Resolve the prometheus instance label to use in queries
Given the watcher node.hostname, resolve the prometheus instance
label for use in queries, first trying the fqdn_instance_labels and
then the host_instance_map (watcher.node_name can be fqdn or hostname).
If the name is not resolved after the first attempt, rebuild the fqdn
and host instance maps and try again. This allows for new hosts added
after the initialisation of the fqdn_instance_labels.
:param node_name: the watcher node.hostname
:return String for the prometheus instance label and None if not found
"""
def _query_maps(node):
if node in self.prometheus_fqdn_labels:
return node
else:
return self.prometheus_host_instance_map.get(node, None)
instance_label = _query_maps(node_name)
# refresh the fqdn and host instance maps and retry
if not instance_label:
self.prometheus_fqdn_labels = (
self._build_prometheus_fqdn_labels()
)
self.prometheus_host_instance_map = (
self._build_prometheus_host_instance_map()
)
instance_label = _query_maps(node_name)
if not instance_label:
LOG.error("Cannot query prometheus without instance label. "
"Could not resolve %s", node_name)
return None
return instance_label
def _resolve_prometheus_aggregate(self, watcher_aggregate, meter):
"""Resolve the prometheus aggregate using self.AGGREGATES_MAP
This uses the AGGREGATES_MAP to resolve the correct prometheus
aggregate to use in queries, from the given watcher aggregate
"""
if watcher_aggregate == 'count':
LOG.warning('Prometheus data source does not currently support '
' the count aggregate. Proceeding with mean (avg).')
promql_aggregate = self.AGGREGATES_MAP.get(watcher_aggregate)
if not promql_aggregate:
raise exception.InvalidParameter(
message=(_("Unknown Watcher aggregate %s. This does not "
"resolve to any valid prometheus query aggregate.")
% watcher_aggregate)
)
return promql_aggregate
def _build_prometheus_query(self, aggregate, meter, instance_label,
period, resource=None):
"""Build and return the prometheus query string with the given args
This function builds and returns the string query that will be sent
to the Prometheus server /query endpoint. For host cpu usage we use:
100 - (avg by (fqdn)(rate(node_cpu_seconds_total{mode='idle',
fqdn='some_host'}[300s])) * 100)
so using prometheus rate function over the specified period, we average
per instance (all cpus) idle time and then 'everything else' is cpu
usage time.
For host memory usage we use:
(node_memory_MemTotal_bytes{instance='the_host'} -
avg_over_time(
node_memory_MemAvailable_bytes{instance='the_host'}[300s]))
/ 1024
So we take total and subtract available memory to determine
how much is in use. We use the prometheus xxx_over_time functions
avg/max/min depending on the aggregate with the specified time period.
:param aggregate: one of the values of self.AGGREGATES_MAP
:param meter: the name of the Prometheus meter to use
:param instance_label: the Prometheus instance label (scrape target).
:param period: the period in seconds for which to query
:param resource: the resource object for which metrics are requested
:return: a String containing the Prometheus query
:raises watcher.common.exception.InvalidParameter if params are None
:raises watcher.common.exception.InvalidParameter if meter is not
known or currently supported (prometheus meter name).
"""
query_args = None
uuid_label_key = self._get_instance_uuid_label()
if (meter is None or aggregate is None or instance_label is None or
period is None):
raise exception.InvalidParameter(
message=(_(
"Cannot build prometheus query without args. "
"You provided: meter %(mtr)s, aggregate %(agg)s, "
"instance_label %(inst)s, period %(prd)s")
% {'mtr': meter, 'agg': aggregate,
'inst': instance_label, 'prd': period})
)
if meter == 'node_cpu_seconds_total':
query_args = (
"100 - (%(agg)s by (%(label)s)(rate(%(meter)s"
"{mode='idle',%(label)s='%(label_value)s'}[%(period)ss])) "
"* 100)"
% {'label': self.prometheus_fqdn_label,
'label_value': instance_label, 'agg': aggregate,
'meter': meter, 'period': period}
)
elif meter == 'node_memory_MemAvailable_bytes':
# Prometheus metric is in B and we need to return KB
query_args = (
"(node_memory_MemTotal_bytes{%(label)s='%(label_value)s'} "
"- %(agg)s_over_time(%(meter)s{%(label)s='%(label_value)s'}"
"[%(period)ss])) / 1024"
% {'label': self.prometheus_fqdn_label,
'label_value': instance_label, 'agg': aggregate,
'meter': meter, 'period': period}
)
elif meter == 'ceilometer_memory_usage':
query_args = (
"%s_over_time(%s{%s='%s'}[%ss])" %
(aggregate, meter, uuid_label_key, instance_label, period)
)
elif meter == 'ceilometer_cpu':
# We are converting the total cumulative cpu time (ns) to cpu usage
# percentage so we need to divide between the number of vcpus.
# As this is a percentage metric, we set a max level of 100. It has
# been observed in very high usage cases, prometheus reporting
# values higher that 100 what can lead to unexpected behaviors.
vcpus = resource.vcpus
if not vcpus:
LOG.warning(
"instance vcpu count not set for instance %s, assuming 1",
instance_label
)
vcpus = 1
query_args = (
"clamp_max((%(agg)s by (%(label)s)"
"(rate(%(meter)s{%(label)s='%(label_value)s'}[%(period)ss]))"
"/10e+8) *(100/%(vcpus)s), 100)"
% {'label': uuid_label_key, 'label_value': instance_label,
'agg': aggregate, 'meter': meter, 'period': period,
'vcpus': vcpus}
)
else:
raise exception.InvalidParameter(
message=(_("Cannot process prometheus meter %s") % meter)
)
return query_args
def check_availability(self):
"""check if Prometheus server is available for queries
Performs HTTP get on the prometheus API /status/runtimeinfo endpoint.
The prometheus_client will raise a PrometheuAPIClientError if the
call is unsuccessful, which is caught here and a warning logged.
"""
try:
self.prometheus._get("status/runtimeinfo")
except prometheus_client.PrometheusAPIClientError:
LOG.warning(
"check_availability raised PrometheusAPIClientError. "
"Is Prometheus server down?"
)
return 'not available'
return 'available'
def list_metrics(self):
"""Fetch all prometheus metrics from api/v1/label/__name__/values
The prometheus_client will raise a PrometheuAPIClientError if the
call is unsuccessful, which is caught here and a warning logged.
"""
try:
response = self.prometheus._get("label/__name__/values")
except prometheus_client.PrometheusAPIClientError:
LOG.warning(
"list_metrics raised PrometheusAPIClientError. Is Prometheus"
"server down?"
)
return set()
return set(response['data'])
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
meter = self._get_meter(meter_name)
query_args = ''
instance_label = ''
# For instance resource type, the datasource expects the uuid of the
# instance to be assigned to a label in the prometheus metrics, with a
# specific key value.
if resource_type == 'compute_node':
instance_label = self._resolve_prometheus_instance_label(
resource.hostname)
elif resource_type == 'instance':
instance_label = resource.uuid
# For ram_allocated and root_disk size metrics there are no valid
# values in the prometheus backend store. We rely in the values
# provided in the vms inventory.
if meter == 'instance.memory':
return float(resource.memory)
elif meter == 'instance.disk':
return float(resource.disk)
else:
LOG.warning(
"Prometheus data source does not currently support "
"resource_type %s", resource_type
)
return None
promql_aggregate = self._resolve_prometheus_aggregate(aggregate, meter)
query_args = self._build_prometheus_query(
promql_aggregate, meter, instance_label, period, resource
)
if not query_args:
LOG.error("Cannot proceed without valid prometheus query")
return None
result = self.query_retry(
self.prometheus.query, query_args,
ignored_exc=prometheus_client.PrometheusAPIClientError,
)
return float(result[0].value) if result else None
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
raise NotImplementedError(
_('Prometheus helper currently does not support statistic_series. '
'This can be considered for future enhancement.'))
def _invert_max_min_aggregate(self, agg):
"""Invert max and min for node/host metric queries from node-exporter
because we query for 'idle'/'unused' cpu and memory.
For Watcher 'max cpu used' we query for prometheus 'min idle time'.
For Watcher 'max memory used' we retrieve min 'unused'/'available'
memory from Prometheus. This internal function is used exclusively
by get_host_cpu_usage and get_host_ram_usage.
:param agg: the metric collection aggregate
:return: a String aggregate
"""
if agg == 'max':
return 'min'
elif agg == 'min':
return 'max'
return agg
def get_host_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
"""Query prometheus for node_cpu_seconds_total
This calculates the host cpu usage and returns it as a percentage
The calculation is made by using the cpu 'idle' time, per
instance (so all CPUs are included). For example the query looks like
(100 - (avg by (fqdn)(rate(node_cpu_seconds_total
{mode='idle',fqdn='compute1.example.com'}[300s])) * 100))
"""
aggregate = self._invert_max_min_aggregate(aggregate)
cpu_usage = self.statistic_aggregation(
resource, 'compute_node',
'host_cpu_usage', period=period,
granularity=granularity, aggregate=aggregate)
return float(cpu_usage) if cpu_usage else None
def get_host_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
aggregate = self._invert_max_min_aggregate(aggregate)
ram_usage = self.statistic_aggregation(
resource, 'compute_node',
'host_ram_usage', period=period,
granularity=granularity, aggregate=aggregate)
return float(ram_usage) if ram_usage else None
def get_instance_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
ram_usage = self.statistic_aggregation(
resource, 'instance',
'instance_ram_usage', period=period,
granularity=granularity, aggregate=aggregate)
return ram_usage
def get_instance_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
cpu_usage = self.statistic_aggregation(
resource, 'instance',
'instance_cpu_usage', period=period,
granularity=granularity, aggregate=aggregate)
return cpu_usage
def get_instance_ram_allocated(self, resource, period=300,
aggregate="mean", granularity=None):
ram_allocated = self.statistic_aggregation(
resource, 'instance',
'instance_ram_allocated', period=period,
granularity=granularity, aggregate=aggregate)
return ram_allocated
def get_instance_root_disk_size(self, resource, period=300,
aggregate="mean", granularity=None):
root_disk_size = self.statistic_aggregation(
resource, 'instance',
'instance_root_disk_size', period=period,
granularity=granularity, aggregate=aggregate)
return root_disk_size

View File

@@ -0,0 +1,61 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
from observabilityclient.utils import metric_utils as obs_client_utils
from oslo_config import cfg
from watcher.decision_engine.datasources import aetos as aetos_helper
from watcher.tests import base
class TestAetosHelper(base.BaseTestCase):
def setUp(self):
super(TestAetosHelper, self).setUp()
with mock.patch.object(
aetos_helper.AetosHelper, '_setup_prometheus_client'
):
self.helper = aetos_helper.AetosHelper(mock.Mock())
def test_get_fqdn_label(self):
fqdn = 'fqdn_label'
cfg.CONF.aetos_client.fqdn_label = fqdn
self.assertEqual(
fqdn,
self.helper._get_fqdn_label()
)
def test_get_instance_uuid_label(self):
instance_uuid = 'instance_uuid_label'
cfg.CONF.aetos_client.instance_uuid_label = instance_uuid
self.assertEqual(
instance_uuid,
self.helper._get_instance_uuid_label()
)
@mock.patch.object(obs_client_utils, 'get_prom_client_from_keystone')
def test_setup_prometheus_client(self, mock_get_prom_client):
cfg.CONF.aetos_client.interface = 'internal'
cfg.CONF.aetos_client.region_name = 'RegionTwo'
opts = {'interface': 'internal',
'region_name': 'RegionTwo',
'service_type': 'metric-storage'}
osc = mock.Mock()
osc.session = mock.Mock()
aetos_helper.AetosHelper(osc)
mock_get_prom_client.assert_called_once_with(
osc.session, adapter_options=opts)

View File

@@ -19,10 +19,12 @@ from unittest import mock
from unittest.mock import MagicMock
from watcher.common import exception
from watcher.decision_engine.datasources import aetos
from watcher.decision_engine.datasources import gnocchi
from watcher.decision_engine.datasources import grafana
from watcher.decision_engine.datasources import manager as ds_manager
from watcher.decision_engine.datasources import monasca
from watcher.decision_engine.datasources import prometheus
from watcher.tests import base
@@ -156,3 +158,48 @@ class TestDataSourceManager(base.BaseTestCase):
self.assertRaises(exception.InvalidParameter, manager.get_backend, [])
self.assertRaises(exception.InvalidParameter, manager.get_backend,
None)
def test_datasource_validation_prometheus_and_aetos_conflict(self):
"""Test having both prometheus and aetos datasources raises error"""
conflicting_datasources = [
prometheus.PrometheusHelper.NAME,
aetos.AetosHelper.NAME
]
dsmcfg = self._dsm_config(datasources=conflicting_datasources)
self.assertRaises(
exception.DataSourceConfigConflict,
self._dsm,
config=dsmcfg
)
def test_datasource_validation_single_prometheus_ok(self):
"""Test that having only prometheus datasource works"""
prometheus_datasource = [prometheus.PrometheusHelper.NAME]
dsmcfg = self._dsm_config(datasources=prometheus_datasource)
# Should not raise any exception
manager = self._dsm(config=dsmcfg)
self.assertIsNotNone(manager)
def test_datasource_validation_single_aetos_ok(self):
"""Test that having only aetos datasource works"""
aetos_datasource = [aetos.AetosHelper.NAME]
dsmcfg = self._dsm_config(datasources=aetos_datasource)
# Should not raise any exception
manager = self._dsm(config=dsmcfg)
self.assertIsNotNone(manager)
def test_datasource_validation_mixed_datasources_ok(self):
"""Test mixing aetos with other non-prometheus datasources works"""
mixed_datasources = [
aetos.AetosHelper.NAME,
gnocchi.GnocchiHelper.NAME,
monasca.MonascaHelper.NAME
]
dsmcfg = self._dsm_config(datasources=mixed_datasources)
# Should not raise any exception
manager = self._dsm(config=dsmcfg)
self.assertIsNotNone(manager)

View File

@@ -0,0 +1,690 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
from observabilityclient import prometheus_client
from watcher.common import exception
from watcher.decision_engine.datasources import prometheus_base
from watcher.tests import base
class TestPrometheusBase(base.BaseTestCase):
"""Test class for Prometheus base datasource class.
This class contains tests for the PrometheusBase class
which is used by the AetosHelper and PrometheusHelper classes.
"""
def setUp(self):
super(TestPrometheusBase, self).setUp()
with (
mock.patch.object(
prometheus_base.PrometheusBase, '_setup_prometheus_client',
return_value=prometheus_client.PrometheusAPIClient('')),
mock.patch.object(
prometheus_base.PrometheusBase, '_get_fqdn_label',
return_value='fqdn'),
mock.patch.object(
prometheus_base.PrometheusBase, '_get_instance_uuid_label',
return_value='resource'),
mock.patch.object(
prometheus_client.PrometheusAPIClient, '_get',
return_value={'data': {'activeTargets': [
{'labels': {
'fqdn': 'marios-env.controlplane.domain',
'instance': '10.0.1.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'marios-env-again.controlplane.domain',
'instance': 'localhost:9100', 'job': 'node'
}}
]}})
):
self.helper = prometheus_base.PrometheusBase()
self.helper._get_instance_uuid_label = mock.Mock(
return_value='resource')
self.helper._get_fqdn_label = mock.Mock(
return_value='fqdn')
stat_agg_patcher = mock.patch.object(
self.helper, 'statistic_aggregation',
spec=prometheus_base.PrometheusBase.statistic_aggregation)
self.mock_aggregation = stat_agg_patcher.start()
self.addCleanup(stat_agg_patcher.stop)
self.mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=2)
def _create_helper(self, fqdn_label='fqdn',
instance_uuid_label='resource'):
with (
mock.patch.object(
prometheus_base.PrometheusBase, '_setup_prometheus_client',
return_value=prometheus_client.PrometheusAPIClient('')
),
mock.patch.object(
prometheus_base.PrometheusBase,
'_get_fqdn_label', return_value=fqdn_label
),
mock.patch.object(
prometheus_base.PrometheusBase,
'_get_instance_uuid_label',
return_value=instance_uuid_label
)):
helper = prometheus_base.PrometheusBase()
helper._get_instance_uuid_label = mock.Mock(
return_value=instance_uuid_label)
helper._get_fqdn_label = mock.Mock(
return_value=fqdn_label)
return helper
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_statistic_aggregation(self, mock_prometheus_get,
mock_prometheus_query):
mock_node = mock.Mock(
uuid='1234',
hostname='marios-env.controlplane.domain')
expected_cpu_usage = 3.2706140350701673
mock_prom_metric = mock.Mock(
labels={'instance': '10.0.1.2:9100'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'marios-env.controlplane.domain',
'instance': '10.0.1.2:9100', 'job': 'node',
}}]}}
helper = self._create_helper()
result = helper.statistic_aggregation(
resource=mock_node,
resource_type='compute_node',
meter_name='host_cpu_usage',
period=300,
aggregate='mean',
granularity=300,
)
self.assertEqual(expected_cpu_usage, result)
mock_prometheus_query.assert_called_once_with(
"100 - (avg by (fqdn)(rate(node_cpu_seconds_total"
"{mode='idle',fqdn='marios-env.controlplane.domain'}[300s]))"
" * 100)")
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_cpu_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_cpu_usage = 13.2706140350701673
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = self._create_helper()
cpu_usage = helper.get_instance_cpu_usage(mock_instance)
self.assertIsInstance(cpu_usage, float)
self.assertEqual(expected_cpu_usage, cpu_usage)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_ram_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_ram_usage = 49.86
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_ram_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = self._create_helper()
ram_usage = helper.get_instance_ram_usage(
mock_instance, period=222, aggregate="max",
granularity=200)
self.assertIsInstance(ram_usage, float)
self.assertEqual(expected_ram_usage, ram_usage)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_ram_allocated(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = self._create_helper()
ram_allocated = helper.get_instance_ram_allocated(mock_instance,
period=222,
aggregate="max")
self.assertIsInstance(ram_allocated, float)
self.assertEqual(512, ram_allocated)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_root_disk_size(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = self._create_helper()
disk_size = helper.get_instance_root_disk_size(mock_instance,
period=331,
aggregate="avg")
self.assertIsInstance(disk_size, float)
self.assertEqual(2, disk_size)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_cpu_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_cpu_usage = 13.2706140350701673
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = self._create_helper()
result_cpu = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_cpu_usage',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(expected_cpu_usage, result_cpu)
self.assertIsInstance(result_cpu, float)
mock_prometheus_query.assert_called_once_with(
"clamp_max((avg by (resource)(rate("
"ceilometer_cpu{resource='uuid-0'}[300s]))"
"/10e+8) *(100/2), 100)"
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_ram_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_ram_usage = 49.86
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_ram_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = self._create_helper()
result_ram_usage = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_ram_usage',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(expected_ram_usage, result_ram_usage)
self.assertIsInstance(result_ram_usage, float)
mock_prometheus_query.assert_called_with(
"avg_over_time(ceilometer_memory_usage{resource='uuid-0'}[300s])"
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_root_size(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = self._create_helper()
result_disk = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_root_disk_size',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(2, result_disk)
self.assertIsInstance(result_disk, float)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_ram_alloc(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = self._create_helper()
result_memory = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_ram_allocated',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(512, result_memory)
self.assertIsInstance(result_memory, float)
def test_statistic_aggregation_metric_unavailable(self):
self.assertRaisesRegex(
NotImplementedError, 'does not support statistic_series',
self.helper.statistic_series
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_list_metrics(self, mock_prometheus_get):
expected_metrics = set(
['go_gc_duration_seconds', 'go_gc_duration_seconds_count',
'go_gc_duration_seconds_sum', 'go_goroutines',]
)
mock_prometheus_get.return_value = {
'status': 'success', 'data': [
'go_gc_duration_seconds', 'go_gc_duration_seconds_count',
'go_gc_duration_seconds_sum', 'go_goroutines',
]
}
result = self.helper.list_metrics()
self.assertEqual(expected_metrics, result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_list_metrics_error(self, mock_prometheus_get):
mock_prometheus_get.side_effect = (
prometheus_client.PrometheusAPIClientError("nope"))
result = self.helper.list_metrics()
self.assertEqual(set(), result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_check_availability(self, mock_prometheus_get):
mock_prometheus_get.return_value = {
'status': 'success',
'data': {
'startTime': '2024-11-05T12:59:56.962333207Z',
'CWD': '/prometheus', 'reloadConfigSuccess': True,
'lastConfigTime': '2024-11-05T12:59:56Z',
'corruptionCount': 0, 'goroutineCount': 30,
'GOMAXPROCS': 8, 'GOMEMLIMIT': 9223372036854775807,
'GOGC': '75', 'GODEBUG': '', 'storageRetention': '15d'
}
}
result = self.helper.check_availability()
self.assertEqual('available', result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_check_availability_error(self, mock_prometheus_get):
mock_prometheus_get.side_effect = (
prometheus_client.PrometheusAPIClientError("nope"))
result = self.helper.check_availability()
self.assertEqual('not available', result)
def test_get_host_cpu_usage(self):
cpu_use = self.helper.get_host_cpu_usage('someNode', 345, 'mean', 300)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'someNode', 'compute_node', 'host_cpu_usage', period=345,
granularity=300, aggregate='mean')
def test_get_host_cpu_usage_none(self):
self.mock_aggregation.return_value = None
cpu_use = self.helper.get_host_cpu_usage('someNode', 345, 'mean', 300)
self.assertIsNone(cpu_use)
def test_get_host_cpu_usage_max(self):
cpu_use = self.helper.get_host_cpu_usage('theNode', 223, 'max', 100)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'theNode', 'compute_node', 'host_cpu_usage', period=223,
granularity=100, aggregate='min')
def test_get_host_cpu_usage_min(self):
cpu_use = self.helper.get_host_cpu_usage('theNode', 223, 'min', 100)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'theNode', 'compute_node', 'host_cpu_usage', period=223,
granularity=100, aggregate='max')
def test_get_host_ram_usage(self):
ram_use = self.helper.get_host_ram_usage(
'anotherNode', 456, 'mean', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'anotherNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='mean')
def test_get_host_ram_usage_none(self):
self.mock_aggregation.return_value = None
ram_use = self.helper.get_host_ram_usage('NOPE', 234, 'mean', 567)
self.assertIsNone(ram_use, float)
self.mock_aggregation.assert_called()
self.mock_aggregation.assert_called_once_with(
'NOPE', 'compute_node', 'host_ram_usage', period=234,
granularity=567, aggregate='mean')
def test_get_host_ram_usage_max(self):
ram_use = self.helper.get_host_ram_usage(
'aNode', 456, 'max', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'aNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='min')
def test_get_host_ram_usage_min(self):
ram_use = self.helper.get_host_ram_usage(
'aNode', 456, 'min', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'aNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='max')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_build_prometheus_fqdn_host_instance_map(
self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'baz.controlplane.domain',
'instance': '10.1.2.3:9100', 'job': 'node',
}},
]}}
expected_fqdn_list = {'foo.controlplane.domain',
'bar.controlplane.domain',
'baz.controlplane.domain'}
expected_host_map = {'foo': 'foo.controlplane.domain',
'bar': 'bar.controlplane.domain',
'baz': 'baz.controlplane.domain'}
helper = self._create_helper()
self.assertEqual(helper.prometheus_fqdn_labels,
expected_fqdn_list)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_build_prometheus_fqdn_host_instance_map_dupl_fqdn(
self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'foo.controlplane.domain',
'instance': '10.1.2.1:9229', 'job': 'podman',
}},
{'labels': {
'fqdn': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'baz.controlplane.domain',
'instance': '10.1.2.3:9100', 'job': 'node',
}},
]}}
expected_fqdn_list = {'foo.controlplane.domain',
'bar.controlplane.domain',
'baz.controlplane.domain'}
expected_host_map = {'foo': 'foo.controlplane.domain',
'bar': 'bar.controlplane.domain',
'baz': 'baz.controlplane.domain'}
helper = self._create_helper()
self.assertEqual(helper.prometheus_fqdn_labels,
expected_fqdn_list)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_missing_prometheus_fqdn_label(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
helper = self._create_helper()
self.assertEqual(set(), helper.prometheus_fqdn_labels)
self.assertEqual({}, helper.prometheus_host_instance_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_using_hostnames_not_fqdn(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'ena',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'dyo',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
helper = self._create_helper()
expected_fqdn_list = {'ena', 'dyo'}
self.assertEqual(
helper.prometheus_fqdn_labels, expected_fqdn_list)
self.assertEqual({}, helper.prometheus_host_instance_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_using_ips_not_fqdn(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'ip_label': '10.1.2.1',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'ip_label': '10.1.2.2',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
helper = self._create_helper(fqdn_label='ip_label')
expected_fqdn_list = {'10.1.2.1', '10.1.2.2'}
self.assertEqual(
helper.prometheus_fqdn_labels, expected_fqdn_list)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_override_prometheus_fqdn_label(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'custom_fqdn_label': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'custom_fqdn_label': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
expected_fqdn_list = {'foo.controlplane.domain',
'bar.controlplane.domain'}
expected_host_map = {'foo': 'foo.controlplane.domain',
'bar': 'bar.controlplane.domain'}
helper = self._create_helper(fqdn_label='custom_fqdn_label')
self.assertEqual(helper.prometheus_fqdn_labels,
expected_fqdn_list)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
def test_resolve_prometheus_instance_label(self):
expected_instance_label = 'marios-env.controlplane.domain'
result = self.helper._resolve_prometheus_instance_label(
'marios-env.controlplane.domain')
self.assertEqual(result, expected_instance_label)
result = self.helper._resolve_prometheus_instance_label(
'marios-env')
self.assertEqual(result, expected_instance_label)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_resolve_prometheus_instance_label_none(self,
mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': []}}
result = self.helper._resolve_prometheus_instance_label('nope')
self.assertIsNone(result)
mock_prometheus_get.assert_called_once_with("targets?state=active")
def test_build_prometheus_query_node_cpu_avg_agg(self):
expected_query = (
"100 - (avg by (fqdn)(rate(node_cpu_seconds_total"
"{mode='idle',fqdn='a_host'}[111s])) * 100)")
result = self.helper._build_prometheus_query(
'avg', 'node_cpu_seconds_total', 'a_host', '111')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_cpu_max_agg(self):
expected_query = (
"100 - (max by (fqdn)(rate(node_cpu_seconds_total"
"{mode='idle',fqdn='b_host'}[444s])) * 100)")
result = self.helper._build_prometheus_query(
'max', 'node_cpu_seconds_total', 'b_host', '444')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_avg_agg(self):
expected_query = (
"(node_memory_MemTotal_bytes{fqdn='c_host'} - avg_over_time"
"(node_memory_MemAvailable_bytes{fqdn='c_host'}[555s])) "
"/ 1024")
result = self.helper._build_prometheus_query(
'avg', 'node_memory_MemAvailable_bytes', 'c_host', '555')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_min_agg(self):
expected_query = (
"(node_memory_MemTotal_bytes{fqdn='d_host'} - min_over_time"
"(node_memory_MemAvailable_bytes{fqdn='d_host'}[222s])) "
"/ 1024")
result = self.helper._build_prometheus_query(
'min', 'node_memory_MemAvailable_bytes', 'd_host', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_cpu_avg_agg_custom_label(self):
self.helper.prometheus_fqdn_label = 'custom_fqdn_label'
expected_query = (
"100 - (avg by (custom_fqdn_label)(rate(node_cpu_seconds_total"
"{mode='idle',custom_fqdn_label='a_host'}[111s])) * 100)")
result = self.helper._build_prometheus_query(
'avg', 'node_cpu_seconds_total', 'a_host', '111')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_min_agg_custom_label(self):
self.helper.prometheus_fqdn_label = 'custom_fqdn'
expected_query = (
"(node_memory_MemTotal_bytes{custom_fqdn='d_host'} - min_over_time"
"(node_memory_MemAvailable_bytes{custom_fqdn='d_host'}[222s])) "
"/ 1024")
result = self.helper._build_prometheus_query(
'min', 'node_memory_MemAvailable_bytes', 'd_host', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_memory_avg_agg(self):
expected_query = (
"avg_over_time(ceilometer_memory_usage{resource='uuid-0'}[555s])"
)
result = self.helper._build_prometheus_query(
'avg', 'ceilometer_memory_usage', 'uuid-0', '555')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_memory_min_agg(self):
expected_query = (
"min_over_time(ceilometer_memory_usage{resource='uuid-0'}[222s])"
)
result = self.helper._build_prometheus_query(
'min', 'ceilometer_memory_usage', 'uuid-0', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_cpu_avg_agg(self):
expected_query = (
"clamp_max((avg by (resource)(rate("
"ceilometer_cpu{resource='uuid-0'}[222s]))"
"/10e+8) *(100/2), 100)"
)
result = self.helper._build_prometheus_query(
'avg', 'ceilometer_cpu', 'uuid-0', '222',
resource=self.mock_instance)
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_cpu_max_agg(self):
expected_query = (
"clamp_max((max by (resource)(rate("
"ceilometer_cpu{resource='uuid-0'}[555s]))"
"/10e+8) *(100/4), 100)"
)
mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=4)
result = self.helper._build_prometheus_query(
'max', 'ceilometer_cpu', 'uuid-0', '555', resource=mock_instance)
self.assertEqual(result, expected_query)
def test_build_prometheus_query_error(self):
self.assertRaisesRegex(
exception.InvalidParameter, 'Cannot process prometheus meter NOPE',
self.helper._build_prometheus_query,
'min', 'NOPE', 'the_host', '222'
)
self.assertRaisesRegex(
exception.InvalidParameter, 'instance_label None, period 333',
self.helper._build_prometheus_query,
'min', 'node_cpu_seconds_total', None, '333'
)
def test_resolve_prometheus_aggregate_vanilla(self):
result = self.helper._resolve_prometheus_aggregate('mean', 'foo')
self.assertEqual(result, 'avg')
result = self.helper._resolve_prometheus_aggregate('count', 'foo')
self.assertEqual(result, 'avg')
result = self.helper._resolve_prometheus_aggregate('max', 'foometric')
self.assertEqual(result, 'max')
result = self.helper._resolve_prometheus_aggregate('min', 'barmetric')
self.assertEqual(result, 'min')
def test_resolve_prometheus_aggregate_unknown(self):
self.assertRaisesRegex(
exception.InvalidParameter, 'Unknown Watcher aggregate NOPE.',
self.helper._resolve_prometheus_aggregate, 'NOPE', 'some_meter')
def test_prometheus_query_custom_uuid_label(self):
self.helper._get_instance_uuid_label.return_value = 'custom_uuid_label'
expected_query = (
"clamp_max((max by (custom_uuid_label)"
"(rate(ceilometer_cpu{custom_uuid_label='uuid-0'}[555s]))"
"/10e+8) *(100/4), 100)"
)
mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=4)
result = self.helper._build_prometheus_query(
'max', 'ceilometer_cpu', 'uuid-0', '555', resource=mock_instance)
self.assertEqual(result, expected_query)

View File

@@ -23,34 +23,40 @@ from watcher.tests import base
class TestPrometheusHelper(base.BaseTestCase):
def setUp(self):
super(TestPrometheusHelper, self).setUp()
cfg.CONF.prometheus_client.host = "foobarbaz"
cfg.CONF.prometheus_client.port = "1234"
with mock.patch.object(
prometheus_client.PrometheusAPIClient, '_get',
return_value={'data': {'activeTargets': [
{'labels': {
'fqdn': 'marios-env.controlplane.domain',
'instance': '10.0.1.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'marios-env-again.controlplane.domain',
'instance': 'localhost:9100', 'job': 'node'
}}
]}}):
cfg.CONF.prometheus_client.host = "foobarbaz"
cfg.CONF.prometheus_client.port = "1234"
prometheus_helper.PrometheusHelper,
'_setup_prometheus_client'
):
self.helper = prometheus_helper.PrometheusHelper()
stat_agg_patcher = mock.patch.object(
self.helper, 'statistic_aggregation',
spec=prometheus_helper.PrometheusHelper.statistic_aggregation)
self.mock_aggregation = stat_agg_patcher.start()
self.addCleanup(stat_agg_patcher.stop)
self.mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=2)
# Set up patches for all methods used inside the
# _setup_prometheus_client
self.mock_init = mock.patch.object(
prometheus_client.PrometheusAPIClient, '__init__',
return_value=None).start()
self.addCleanup(self.mock_init.stop)
self.mock_set_ca_cert = mock.patch.object(
prometheus_client.PrometheusAPIClient, 'set_ca_cert').start()
self.addCleanup(self.mock_set_ca_cert.stop)
self.mock_set_client_cert = mock.patch.object(
prometheus_client.PrometheusAPIClient, 'set_client_cert').start()
self.addCleanup(self.mock_set_client_cert.stop)
self.mock_set_basic_auth = mock.patch.object(
prometheus_client.PrometheusAPIClient, 'set_basic_auth').start()
self.addCleanup(self.mock_set_basic_auth.stop)
self.mock_build_fqdn_labels = mock.patch.object(
prometheus_helper.PrometheusHelper,
'_build_prometheus_fqdn_labels').start()
self.addCleanup(self.mock_build_fqdn_labels.stop)
def test_unset_missing_prometheus_host(self):
cfg.CONF.prometheus_client.port = '123'
@@ -113,601 +119,50 @@ class TestPrometheusHelper(base.BaseTestCase):
prometheus_helper.PrometheusHelper
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_statistic_aggregation(self, mock_prometheus_get,
mock_prometheus_query):
mock_node = mock.Mock(
uuid='1234',
hostname='marios-env.controlplane.domain')
expected_cpu_usage = 3.2706140350701673
mock_prom_metric = mock.Mock(
labels={'instance': '10.0.1.2:9100'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'marios-env.controlplane.domain',
'instance': '10.0.1.2:9100', 'job': 'node',
}}]}}
helper = prometheus_helper.PrometheusHelper()
result = helper.statistic_aggregation(
resource=mock_node,
resource_type='compute_node',
meter_name='host_cpu_usage',
period=300,
aggregate='mean',
granularity=300,
)
self.assertEqual(expected_cpu_usage, result)
mock_prometheus_query.assert_called_once_with(
"100 - (avg by (fqdn)(rate(node_cpu_seconds_total"
"{mode='idle',fqdn='marios-env.controlplane.domain'}[300s]))"
" * 100)")
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_cpu_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_cpu_usage = 13.2706140350701673
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
cpu_usage = helper.get_instance_cpu_usage(mock_instance)
self.assertIsInstance(cpu_usage, float)
self.assertEqual(expected_cpu_usage, cpu_usage)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_ram_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_ram_usage = 49.86
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_ram_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
ram_usage = helper.get_instance_ram_usage(
mock_instance, period=222, aggregate="max",
granularity=200)
self.assertIsInstance(ram_usage, float)
self.assertEqual(expected_ram_usage, ram_usage)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_ram_allocated(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
ram_allocated = helper.get_instance_ram_allocated(mock_instance,
period=222,
aggregate="max")
self.assertIsInstance(ram_allocated, float)
self.assertEqual(512, ram_allocated)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_root_disk_size(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
disk_size = helper.get_instance_root_disk_size(mock_instance,
period=331,
aggregate="avg")
self.assertIsInstance(disk_size, float)
self.assertEqual(2, disk_size)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_cpu_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_cpu_usage = 13.2706140350701673
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
result_cpu = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_cpu_usage',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(expected_cpu_usage, result_cpu)
self.assertIsInstance(result_cpu, float)
mock_prometheus_query.assert_called_once_with(
"clamp_max((avg by (resource)(rate("
"ceilometer_cpu{resource='uuid-0'}[300s]))"
"/10e+8) *(100/2), 100)"
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_ram_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_ram_usage = 49.86
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_ram_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
result_ram_usage = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_ram_usage',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(expected_ram_usage, result_ram_usage)
self.assertIsInstance(result_ram_usage, float)
mock_prometheus_query.assert_called_with(
"avg_over_time(ceilometer_memory_usage{resource='uuid-0'}[300s])"
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_root_size(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
result_disk = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_root_disk_size',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(2, result_disk)
self.assertIsInstance(result_disk, float)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_ram_alloc(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
result_memory = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_ram_allocated',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(512, result_memory)
self.assertIsInstance(result_memory, float)
def test_statistic_aggregation_metric_unavailable(self):
self.assertRaisesRegex(
NotImplementedError, 'does not support statistic_series',
self.helper.statistic_series
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_list_metrics(self, mock_prometheus_get):
expected_metrics = set(
['go_gc_duration_seconds', 'go_gc_duration_seconds_count',
'go_gc_duration_seconds_sum', 'go_goroutines',]
)
mock_prometheus_get.return_value = {
'status': 'success', 'data': [
'go_gc_duration_seconds', 'go_gc_duration_seconds_count',
'go_gc_duration_seconds_sum', 'go_goroutines',
]
}
result = self.helper.list_metrics()
self.assertEqual(expected_metrics, result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_list_metrics_error(self, mock_prometheus_get):
mock_prometheus_get.side_effect = (
prometheus_client.PrometheusAPIClientError("nope"))
result = self.helper.list_metrics()
self.assertEqual(set(), result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_check_availability(self, mock_prometheus_get):
mock_prometheus_get.return_value = {
'status': 'success',
'data': {
'startTime': '2024-11-05T12:59:56.962333207Z',
'CWD': '/prometheus', 'reloadConfigSuccess': True,
'lastConfigTime': '2024-11-05T12:59:56Z',
'corruptionCount': 0, 'goroutineCount': 30,
'GOMAXPROCS': 8, 'GOMEMLIMIT': 9223372036854775807,
'GOGC': '75', 'GODEBUG': '', 'storageRetention': '15d'
}
}
result = self.helper.check_availability()
self.assertEqual('available', result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_check_availability_error(self, mock_prometheus_get):
mock_prometheus_get.side_effect = (
prometheus_client.PrometheusAPIClientError("nope"))
result = self.helper.check_availability()
self.assertEqual('not available', result)
def test_get_host_cpu_usage(self):
cpu_use = self.helper.get_host_cpu_usage('someNode', 345, 'mean', 300)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'someNode', 'compute_node', 'host_cpu_usage', period=345,
granularity=300, aggregate='mean')
def test_get_host_cpu_usage_none(self):
self.mock_aggregation.return_value = None
cpu_use = self.helper.get_host_cpu_usage('someNode', 345, 'mean', 300)
self.assertIsNone(cpu_use)
def test_get_host_cpu_usage_max(self):
cpu_use = self.helper.get_host_cpu_usage('theNode', 223, 'max', 100)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'theNode', 'compute_node', 'host_cpu_usage', period=223,
granularity=100, aggregate='min')
def test_get_host_cpu_usage_min(self):
cpu_use = self.helper.get_host_cpu_usage('theNode', 223, 'min', 100)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'theNode', 'compute_node', 'host_cpu_usage', period=223,
granularity=100, aggregate='max')
def test_get_host_ram_usage(self):
ram_use = self.helper.get_host_ram_usage(
'anotherNode', 456, 'mean', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'anotherNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='mean')
def test_get_host_ram_usage_none(self):
self.mock_aggregation.return_value = None
ram_use = self.helper.get_host_ram_usage('NOPE', 234, 'mean', 567)
self.assertIsNone(ram_use, float)
self.mock_aggregation.assert_called()
self.mock_aggregation.assert_called_once_with(
'NOPE', 'compute_node', 'host_ram_usage', period=234,
granularity=567, aggregate='mean')
def test_get_host_ram_usage_max(self):
ram_use = self.helper.get_host_ram_usage(
'aNode', 456, 'max', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'aNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='min')
def test_get_host_ram_usage_min(self):
ram_use = self.helper.get_host_ram_usage(
'aNode', 456, 'min', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'aNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='max')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_build_prometheus_fqdn_host_instance_map(
self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'baz.controlplane.domain',
'instance': '10.1.2.3:9100', 'job': 'node',
}},
]}}
expected_fqdn_list = {'foo.controlplane.domain',
'bar.controlplane.domain',
'baz.controlplane.domain'}
expected_host_map = {'foo': 'foo.controlplane.domain',
'bar': 'bar.controlplane.domain',
'baz': 'baz.controlplane.domain'}
helper = prometheus_helper.PrometheusHelper()
self.assertEqual(helper.prometheus_fqdn_labels,
expected_fqdn_list)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_build_prometheus_fqdn_host_instance_map_dupl_fqdn(
self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'foo.controlplane.domain',
'instance': '10.1.2.1:9229', 'job': 'podman',
}},
{'labels': {
'fqdn': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'baz.controlplane.domain',
'instance': '10.1.2.3:9100', 'job': 'node',
}},
]}}
expected_fqdn_list = {'foo.controlplane.domain',
'bar.controlplane.domain',
'baz.controlplane.domain'}
expected_host_map = {'foo': 'foo.controlplane.domain',
'bar': 'bar.controlplane.domain',
'baz': 'baz.controlplane.domain'}
helper = prometheus_helper.PrometheusHelper()
self.assertEqual(helper.prometheus_fqdn_labels,
expected_fqdn_list)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_missing_prometheus_fqdn_label(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
helper = prometheus_helper.PrometheusHelper()
self.assertEqual(set(), helper.prometheus_fqdn_labels)
self.assertEqual({}, helper.prometheus_host_instance_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_using_hostnames_not_fqdn(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'ena',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'dyo',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
helper = prometheus_helper.PrometheusHelper()
expected_fqdn_list = {'ena', 'dyo'}
def test_get_fqdn_label(self):
fqdn = 'fqdn_label'
cfg.CONF.prometheus_client.fqdn_label = fqdn
self.assertEqual(
helper.prometheus_fqdn_labels, expected_fqdn_list)
self.assertEqual({}, helper.prometheus_host_instance_map)
fqdn,
self.helper._get_fqdn_label()
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_using_ips_not_fqdn(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'ip_label': '10.1.2.1',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'ip_label': '10.1.2.2',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
cfg.CONF.prometheus_client.fqdn_label = 'ip_label'
helper = prometheus_helper.PrometheusHelper()
expected_fqdn_list = {'10.1.2.1', '10.1.2.2'}
def test_get_instance_uuid_label(self):
instance_uuid = 'instance_uuid_label'
cfg.CONF.prometheus_client.instance_uuid_label = instance_uuid
self.assertEqual(
helper.prometheus_fqdn_labels, expected_fqdn_list)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_override_prometheus_fqdn_label(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'custom_fqdn_label': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'custom_fqdn_label': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
expected_fqdn_list = {'foo.controlplane.domain',
'bar.controlplane.domain'}
expected_host_map = {'foo': 'foo.controlplane.domain',
'bar': 'bar.controlplane.domain'}
cfg.CONF.prometheus_client.fqdn_label = 'custom_fqdn_label'
helper = prometheus_helper.PrometheusHelper()
self.assertEqual(helper.prometheus_fqdn_labels,
expected_fqdn_list)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
def test_resolve_prometheus_instance_label(self):
expected_instance_label = 'marios-env.controlplane.domain'
result = self.helper._resolve_prometheus_instance_label(
'marios-env.controlplane.domain')
self.assertEqual(result, expected_instance_label)
result = self.helper._resolve_prometheus_instance_label(
'marios-env')
self.assertEqual(result, expected_instance_label)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_resolve_prometheus_instance_label_none(self,
mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': []}}
result = self.helper._resolve_prometheus_instance_label('nope')
self.assertIsNone(result)
mock_prometheus_get.assert_called_once_with("targets?state=active")
def test_build_prometheus_query_node_cpu_avg_agg(self):
expected_query = (
"100 - (avg by (fqdn)(rate(node_cpu_seconds_total"
"{mode='idle',fqdn='a_host'}[111s])) * 100)")
result = self.helper._build_prometheus_query(
'avg', 'node_cpu_seconds_total', 'a_host', '111')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_cpu_max_agg(self):
expected_query = (
"100 - (max by (fqdn)(rate(node_cpu_seconds_total"
"{mode='idle',fqdn='b_host'}[444s])) * 100)")
result = self.helper._build_prometheus_query(
'max', 'node_cpu_seconds_total', 'b_host', '444')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_avg_agg(self):
expected_query = (
"(node_memory_MemTotal_bytes{fqdn='c_host'} - avg_over_time"
"(node_memory_MemAvailable_bytes{fqdn='c_host'}[555s])) "
"/ 1024")
result = self.helper._build_prometheus_query(
'avg', 'node_memory_MemAvailable_bytes', 'c_host', '555')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_min_agg(self):
expected_query = (
"(node_memory_MemTotal_bytes{fqdn='d_host'} - min_over_time"
"(node_memory_MemAvailable_bytes{fqdn='d_host'}[222s])) "
"/ 1024")
result = self.helper._build_prometheus_query(
'min', 'node_memory_MemAvailable_bytes', 'd_host', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_cpu_avg_agg_custom_label(self):
self.helper.prometheus_fqdn_label = 'custom_fqdn_label'
expected_query = (
"100 - (avg by (custom_fqdn_label)(rate(node_cpu_seconds_total"
"{mode='idle',custom_fqdn_label='a_host'}[111s])) * 100)")
result = self.helper._build_prometheus_query(
'avg', 'node_cpu_seconds_total', 'a_host', '111')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_min_agg_custom_label(self):
self.helper.prometheus_fqdn_label = 'custom_fqdn'
expected_query = (
"(node_memory_MemTotal_bytes{custom_fqdn='d_host'} - min_over_time"
"(node_memory_MemAvailable_bytes{custom_fqdn='d_host'}[222s])) "
"/ 1024")
result = self.helper._build_prometheus_query(
'min', 'node_memory_MemAvailable_bytes', 'd_host', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_memory_avg_agg(self):
expected_query = (
"avg_over_time(ceilometer_memory_usage{resource='uuid-0'}[555s])"
)
result = self.helper._build_prometheus_query(
'avg', 'ceilometer_memory_usage', 'uuid-0', '555')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_memory_min_agg(self):
expected_query = (
"min_over_time(ceilometer_memory_usage{resource='uuid-0'}[222s])"
)
result = self.helper._build_prometheus_query(
'min', 'ceilometer_memory_usage', 'uuid-0', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_cpu_avg_agg(self):
expected_query = (
"clamp_max((avg by (resource)(rate("
"ceilometer_cpu{resource='uuid-0'}[222s]))"
"/10e+8) *(100/2), 100)"
)
result = self.helper._build_prometheus_query(
'avg', 'ceilometer_cpu', 'uuid-0', '222',
resource=self.mock_instance)
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_cpu_max_agg(self):
expected_query = (
"clamp_max((max by (resource)(rate("
"ceilometer_cpu{resource='uuid-0'}[555s]))"
"/10e+8) *(100/4), 100)"
)
mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=4)
result = self.helper._build_prometheus_query(
'max', 'ceilometer_cpu', 'uuid-0', '555', resource=mock_instance)
self.assertEqual(result, expected_query)
def test_build_prometheus_query_error(self):
self.assertRaisesRegex(
exception.InvalidParameter, 'Cannot process prometheus meter NOPE',
self.helper._build_prometheus_query,
'min', 'NOPE', 'the_host', '222'
)
self.assertRaisesRegex(
exception.InvalidParameter, 'instance_label None, period 333',
self.helper._build_prometheus_query,
'min', 'node_cpu_seconds_total', None, '333'
instance_uuid,
self.helper._get_instance_uuid_label()
)
def test_resolve_prometheus_aggregate_vanilla(self):
result = self.helper._resolve_prometheus_aggregate('mean', 'foo')
self.assertEqual(result, 'avg')
result = self.helper._resolve_prometheus_aggregate('count', 'foo')
self.assertEqual(result, 'avg')
result = self.helper._resolve_prometheus_aggregate('max', 'foometric')
self.assertEqual(result, 'max')
result = self.helper._resolve_prometheus_aggregate('min', 'barmetric')
self.assertEqual(result, 'min')
def test_setup_prometheus_client_no_auth_no_tls(self):
cfg.CONF.prometheus_client.host = "somehost"
cfg.CONF.prometheus_client.port = "1234"
prometheus_helper.PrometheusHelper()
def test_resolve_prometheus_aggregate_unknown(self):
self.assertRaisesRegex(
exception.InvalidParameter, 'Unknown Watcher aggregate NOPE.',
self.helper._resolve_prometheus_aggregate, 'NOPE', 'some_meter')
self.mock_init.assert_called_once_with("somehost:1234")
self.mock_set_basic_auth.assert_not_called()
self.mock_set_client_cert.assert_not_called()
self.mock_set_ca_cert.assert_not_called()
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_query_custom_uuid_label(self, mock_prometheus_get):
cfg.CONF.prometheus_client.instance_uuid_label = 'custom_uuid_label'
expected_query = (
"clamp_max((max by (custom_uuid_label)"
"(rate(ceilometer_cpu{custom_uuid_label='uuid-0'}[555s]))"
"/10e+8) *(100/4), 100)"
)
mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=4)
result = self.helper._build_prometheus_query(
'max', 'ceilometer_cpu', 'uuid-0', '555', resource=mock_instance)
self.assertEqual(result, expected_query)
def test_setup_prometheus_client_tls(self):
cfg.CONF.prometheus_client.cafile = "/some/path"
prometheus_helper.PrometheusHelper()
self.mock_set_ca_cert.assert_called_once_with("/some/path")
def test_setup_prometheus_client_basic_auth(self):
cfg.CONF.prometheus_client.username = "user"
cfg.CONF.prometheus_client.password = "password"
prometheus_helper.PrometheusHelper()
self.mock_set_basic_auth.assert_called_once_with("user", "password")
def test_setup_prometheus_client_mtls(self):
cfg.CONF.prometheus_client.certfile = "/cert/path"
cfg.CONF.prometheus_client.keyfile = "/key/path"
cfg.CONF.prometheus_client.cafile = "/ca/path"
prometheus_helper.PrometheusHelper()
self.mock_set_client_cert.assert_called_once_with(
"/cert/path", "/key/path")