Add support for instance metrics to prometheus datasource

In order to support vm_workload_consolidation, workload_balance and
workload_stabilization strategis some instance metrics are required.
This patch is adding support for them.

Implementation is based on a prometheus store populated using sg-core
from ceilometer metrics with Pollster source.

- instance_ram_usage: rely on ceilometer_memory_usage metrics created from
  ceilometer memory.usage meter.
- instance_ram_allocated: rely on the memory value provided by the
  inventory created from nova and placement APIs.
- instance_cpu_usage: rely on ceilometer_cpu metric created from
  ceilometer cpu meter. A max value of 100 is set in the query.
- instance_root_disk_size: rely on the `disk` value provided by the
  inventory created from nova and placement APIs.

A new parameterer `instance_uuid_label` has been added to the prometheus
datasource configuration to identify the label used to store the value of the
OpenStack instance uuid for eache instance metric in prometheus. Default
value is `resource`.

Change-Id: I2f2b56aa002014e511a5e48398ef1da43fc4f5e2
This commit is contained in:
Alfredo Moralejo 2025-01-10 11:30:09 +01:00
parent 3f26dc47f2
commit 136e5d927c
5 changed files with 321 additions and 6 deletions

View File

@ -43,6 +43,12 @@ An example ``fqdn_instance_map`` is the following:
'tria.controlplane.domain': '10.1.2.3:9100'
}
For instance metrics, it is required that Prometheus contains a label
with the uuid of the OpenStack instance in each relevant metric. By default,
the datasource will look for the label ``resource``. The
``instance_uuid_label`` config option in watcher.conf allows deployers to
override this default to any other label name that stores the ``uuid``.
Limitations
-----------
The current implementation doesn't support the ``statistic_series`` function of
@ -89,6 +95,10 @@ duplicated below from the code as they are self documenting:
default="fqdn",
help="The label that Prometheus uses to store the fqdn of "
"exporters. Defaults to 'fqdn'."),
cfg.StrOpt('instance_uuid_label',
default="resource",
help="The label that Prometheus uses to store the uuid of "
"OpenStack instances. Defaults to 'resource'."),
cfg.StrOpt('username',
help="The basic_auth username to use to authenticate with the "
"Prometheus server."),

View File

@ -0,0 +1,6 @@
---
features:
- |
Support for instance metrics has been added to the prometheus data source.
The included metrics are `instance_cpu_usage`, `instance_ram_usage`,
`instance_ram_allocated` and `instance_root_disk_size`.

View File

@ -30,6 +30,10 @@ PROMETHEUS_CLIENT_OPTS = [
default="fqdn",
help="The label that Prometheus uses to store the fqdn of "
"exporters. Defaults to 'fqdn'."),
cfg.StrOpt('instance_uuid_label',
default="resource",
help="The label that Prometheus uses to store the uuid of "
"OpenStack instances. Defaults to 'resource'."),
cfg.StrOpt('username',
help="The basic_auth username to use to authenticate with the "
"Prometheus server."),

View File

@ -38,11 +38,11 @@ class PrometheusHelper(base.DataSourceBase):
host_inlet_temp=None,
host_airflow=None,
host_power=None,
instance_cpu_usage=None,
instance_ram_usage=None,
instance_ram_allocated=None,
instance_cpu_usage='ceilometer_cpu',
instance_ram_usage='ceilometer_memory_usage',
instance_ram_allocated='instance.memory',
instance_l3_cache_usage=None,
instance_root_disk_size=None,
instance_root_disk_size='instance.disk',
)
AGGREGATES_MAP = dict(mean='avg', max='max', min='min', count='avg')
@ -258,7 +258,7 @@ class PrometheusHelper(base.DataSourceBase):
return promql_aggregate
def _build_prometheus_query(self, aggregate, meter, instance_label,
period):
period, resource=None):
"""Build and return the prometheus query string with the given args
This function builds and returns the string query that will be sent
@ -286,12 +286,14 @@ class PrometheusHelper(base.DataSourceBase):
:param meter: the name of the Prometheus meter to use
:param instance_label: the Prometheus instance label (scrape target).
:param period: the period in seconds for which to query
:param resource: the resource object for which metrics are requested
:return: a String containing the Prometheus query
:raises watcher.common.exception.InvalidParameter if params are None
:raises watcher.common.exception.InvalidParameter if meter is not
known or currently supported (prometheus meter name).
"""
query_args = None
uuid_label_key = CONF.prometheus_client.instance_uuid_label
if (meter is None or aggregate is None or instance_label is None or
period is None):
raise exception.InvalidParameter(
@ -317,6 +319,30 @@ class PrometheusHelper(base.DataSourceBase):
(instance_label, aggregate, meter,
instance_label, period)
)
elif meter == 'ceilometer_memory_usage':
query_args = (
"%s_over_time(%s{%s='%s'}[%ss])" %
(aggregate, meter, uuid_label_key, instance_label, period)
)
elif meter == 'ceilometer_cpu':
# We are converting the total cumulative cpu time (ns) to cpu usage
# percentage so we need to divide between the number of vcpus.
# As this is a percentage metric, we set a max level of 100. It has
# been observed in very high usage cases, prometheus reporting
# values higher that 100 what can lead to unexpected behaviors.
vcpus = resource.vcpus
if not vcpus:
LOG.warning(
"instance vcpu count not set for instance %s, assuming 1",
instance_label
)
vcpus = 1
query_args = (
"clamp_max((%s by (instance)(rate(%s{%s='%s'}[%ss]))/10e+8) "
"*(100/%s), 100)" %
(aggregate, meter, uuid_label_key, instance_label, period,
vcpus)
)
else:
raise exception.InvalidParameter(
message=(_("Cannot process prometheus meter %s") % meter)
@ -365,9 +391,21 @@ class PrometheusHelper(base.DataSourceBase):
query_args = ''
instance_label = ''
# For instance resource type, the datasource expects the uuid of the
# instance to be assigned to a label in the prometheus metrics, with a
# specific key value.
if resource_type == 'compute_node':
instance_label = self._resolve_prometheus_instance_label(
resource.hostname)
elif resource_type == 'instance':
instance_label = resource.uuid
# For ram_allocated and root_disk size metrics there are no valid
# values in the prometheus backend store. We rely in the values
# provided in the vms inventory.
if meter == 'instance.memory':
return float(resource.memory)
elif meter == 'instance.disk':
return float(resource.disk)
else:
LOG.warning(
"Prometheus data source does not currently support "
@ -377,7 +415,7 @@ class PrometheusHelper(base.DataSourceBase):
promql_aggregate = self._resolve_prometheus_aggregate(aggregate, meter)
query_args = self._build_prometheus_query(
promql_aggregate, meter, instance_label, period
promql_aggregate, meter, instance_label, period, resource
)
if not query_args:
LOG.error("Cannot proceed without valid prometheus query")
@ -440,3 +478,35 @@ class PrometheusHelper(base.DataSourceBase):
'host_ram_usage', period=period,
granularity=granularity, aggregate=aggregate)
return float(ram_usage) if ram_usage else None
def get_instance_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
ram_usage = self.statistic_aggregation(
resource, 'instance',
'instance_ram_usage', period=period,
granularity=granularity, aggregate=aggregate)
return ram_usage
def get_instance_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
cpu_usage = self.statistic_aggregation(
resource, 'instance',
'instance_cpu_usage', period=period,
granularity=granularity, aggregate=aggregate)
return cpu_usage
def get_instance_ram_allocated(self, resource, period=300,
aggregate="mean", granularity=None):
ram_allocated = self.statistic_aggregation(
resource, 'instance',
'instance_ram_allocated', period=period,
granularity=granularity, aggregate=aggregate)
return ram_allocated
def get_instance_root_disk_size(self, resource, period=300,
aggregate="mean", granularity=None):
root_disk_size = self.statistic_aggregation(
resource, 'instance',
'instance_root_disk_size', period=period,
granularity=granularity, aggregate=aggregate)
return root_disk_size

View File

@ -46,6 +46,11 @@ class TestPrometheusHelper(base.BaseTestCase):
spec=prometheus_helper.PrometheusHelper.statistic_aggregation)
self.mock_aggregation = stat_agg_patcher.start()
self.addCleanup(stat_agg_patcher.stop)
self.mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=2)
def test_unset_missing_prometheus_host(self):
cfg.CONF.prometheus_client.port = '123'
@ -144,6 +149,167 @@ class TestPrometheusHelper(base.BaseTestCase):
"100 - (avg by (instance)(rate(node_cpu_seconds_total"
"{mode='idle',instance='10.0.1.2:9100'}[300s])) * 100)")
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_cpu_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_cpu_usage = 13.2706140350701673
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
cpu_usage = helper.get_instance_cpu_usage(mock_instance)
self.assertIsInstance(cpu_usage, float)
self.assertEqual(expected_cpu_usage, cpu_usage)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_ram_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_ram_usage = 49.86
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_ram_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
ram_usage = helper.get_instance_ram_usage(
mock_instance, period=222, aggregate="max",
granularity=200)
self.assertIsInstance(ram_usage, float)
self.assertEqual(expected_ram_usage, ram_usage)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_ram_allocated(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
ram_allocated = helper.get_instance_ram_allocated(mock_instance,
period=222,
aggregate="max")
self.assertIsInstance(ram_allocated, float)
self.assertEqual(512, ram_allocated)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_get_instance_root_disk_size(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
disk_size = helper.get_instance_root_disk_size(mock_instance,
period=331,
aggregate="avg")
self.assertIsInstance(disk_size, float)
self.assertEqual(2, disk_size)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_cpu_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_cpu_usage = 13.2706140350701673
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
result_cpu = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_cpu_usage',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(expected_cpu_usage, result_cpu)
self.assertIsInstance(result_cpu, float)
mock_prometheus_query.assert_called_once_with(
"clamp_max((avg by (instance)(rate("
"ceilometer_cpu{resource='uuid-0'}[300s]))"
"/10e+8) *(100/2), 100)"
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_ram_usage(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
expected_ram_usage = 49.86
mock_prom_metric = mock.Mock(
labels={'resource': 'uuid-0'},
timestamp=1731065985.408,
value=expected_ram_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
helper = prometheus_helper.PrometheusHelper()
result_ram_usage = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_ram_usage',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(expected_ram_usage, result_ram_usage)
self.assertIsInstance(result_ram_usage, float)
mock_prometheus_query.assert_called_with(
"avg_over_time(ceilometer_memory_usage{resource='uuid-0'}[300s])"
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_root_size(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
result_disk = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_root_disk_size',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(2, result_disk)
self.assertIsInstance(result_disk, float)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_stt_agg_instance_ram_alloc(self, mock_prometheus_get,
mock_prometheus_query):
mock_instance = self.mock_instance
helper = prometheus_helper.PrometheusHelper()
result_memory = helper.statistic_aggregation(
resource=mock_instance,
resource_type='instance',
meter_name='instance_ram_allocated',
period=300,
granularity=300,
aggregate='mean',
)
self.assertEqual(512, result_memory)
self.assertIsInstance(result_memory, float)
def test_statistic_aggregation_metric_unavailable(self):
self.assertRaisesRegex(
NotImplementedError, 'does not support statistic_series',
@ -390,6 +556,48 @@ class TestPrometheusHelper(base.BaseTestCase):
'min', 'node_memory_MemAvailable_bytes', 'd_host', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_memory_avg_agg(self):
expected_query = (
"avg_over_time(ceilometer_memory_usage{resource='uuid-0'}[555s])"
)
result = self.helper._build_prometheus_query(
'avg', 'ceilometer_memory_usage', 'uuid-0', '555')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_memory_min_agg(self):
expected_query = (
"min_over_time(ceilometer_memory_usage{resource='uuid-0'}[222s])"
)
result = self.helper._build_prometheus_query(
'min', 'ceilometer_memory_usage', 'uuid-0', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_cpu_avg_agg(self):
expected_query = (
"clamp_max((avg by (instance)(rate("
"ceilometer_cpu{resource='uuid-0'}[222s]))"
"/10e+8) *(100/2), 100)"
)
result = self.helper._build_prometheus_query(
'avg', 'ceilometer_cpu', 'uuid-0', '222',
resource=self.mock_instance)
self.assertEqual(result, expected_query)
def test_build_prometheus_query_instance_cpu_max_agg(self):
expected_query = (
"clamp_max((max by (instance)(rate("
"ceilometer_cpu{resource='uuid-0'}[555s]))"
"/10e+8) *(100/4), 100)"
)
mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=4)
result = self.helper._build_prometheus_query(
'max', 'ceilometer_cpu', 'uuid-0', '555', resource=mock_instance)
self.assertEqual(result, expected_query)
def test_build_prometheus_query_error(self):
self.assertRaisesRegex(
exception.InvalidParameter, 'Cannot process prometheus meter NOPE',
@ -416,3 +624,20 @@ class TestPrometheusHelper(base.BaseTestCase):
self.assertRaisesRegex(
exception.InvalidParameter, 'Unknown Watcher aggregate NOPE.',
self.helper._resolve_prometheus_aggregate, 'NOPE', 'some_meter')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_query_custom_uuid_label(self, mock_prometheus_get):
cfg.CONF.prometheus_client.instance_uuid_label = 'custom_uuid_label'
expected_query = (
"clamp_max((max by (instance)"
"(rate(ceilometer_cpu{custom_uuid_label='uuid-0'}[555s]))"
"/10e+8) *(100/4), 100)"
)
mock_instance = mock.Mock(
uuid='uuid-0',
memory=512,
disk=2,
vcpus=4)
result = self.helper._build_prometheus_query(
'max', 'ceilometer_cpu', 'uuid-0', '555', resource=mock_instance)
self.assertEqual(result, expected_query)