Extend statistics reporting

In some cases it is desired to extend stat metrics with additional tags
based on the selected connection (i.e. "environment"). Statsd does not
support tags, so add this only to influxdb for now.
Fix establishing connection from parameters when passing influx_config.
When exception happens, we need to also generate metric to be able to
see errors (i.e. timeout happening from Ansible)

I know there are no tests for that area so far at all, this will come later.

Change-Id: Ie0862f04eb224345559f9092cd0a9d8ffa43bef3
This commit is contained in:
Artem Goncharov 2020-05-12 09:59:04 +02:00
parent 7e2a51aeff
commit 2caaa989ce
4 changed files with 129 additions and 65 deletions

View File

@ -57,3 +57,15 @@ Metrics will be reported only when corresponding client libraries (
`statsd` for 'statsd' reporting, `influxdb` for influxdb reporting
correspondingly). When those libraries are not available reporting will be
silently ignored.
InfluxDB reporting allows setting additional tags into the metrics based on the
selected cloud.
.. code-block:: yaml
clouds:
my_cloud:
profile: some_profile
...
additional_metric_tags:
environment: production

View File

@ -289,25 +289,28 @@ class OpenStackConfig:
influxdb_cfg = metrics_config.get('influxdb', {})
# Parse InfluxDB configuration
if influxdb_config:
influxdb_cfg.update(influxdb_config)
if influxdb_cfg:
config = {}
if 'use_udp' in influxdb_cfg:
use_udp = influxdb_cfg['use_udp']
if isinstance(use_udp, str):
use_udp = use_udp.lower() in ('true', 'yes', '1')
elif not isinstance(use_udp, bool):
use_udp = False
self.log.warning('InfluxDB.use_udp value type is not '
'supported. Use one of '
'[true|false|yes|no|1|0]')
config['use_udp'] = use_udp
for key in ['host', 'port', 'username', 'password', 'database',
'measurement', 'timeout']:
if key in influxdb_cfg:
config[key] = influxdb_cfg[key]
self._influxdb_config = config
if not influxdb_config:
influxdb_config = influxdb_cfg
else:
influxdb_config.update(influxdb_cfg)
if influxdb_config:
config = {}
if 'use_udp' in influxdb_config:
use_udp = influxdb_config['use_udp']
if isinstance(use_udp, str):
use_udp = use_udp.lower() in ('true', 'yes', '1')
elif not isinstance(use_udp, bool):
use_udp = False
self.log.warning('InfluxDB.use_udp value type is not '
'supported. Use one of '
'[true|false|yes|no|1|0]')
config['use_udp'] = use_udp
for key in ['host', 'port', 'username', 'password', 'database',
'measurement', 'timeout']:
if key in influxdb_config:
config[key] = influxdb_config[key]
self._influxdb_config = config
if load_envvars:
statsd_host = statsd_host or os.environ.get('STATSD_HOST')

View File

@ -435,6 +435,14 @@ class Connection(
self.log.warning('Configured hook %s cannot be executed: %s',
vendor_hook, e)
# Add additional metrics into the configuration according to the
# selected connection. We don't want to deal with overall config in the
# proxy, just pass required part.
if (self.config._influxdb_config
and 'additional_metric_tags' in self.config.config):
self.config._influxdb_config['additional_metric_tags'] = \
self.config.config['additional_metric_tags']
@property
def session(self):
if not self._session:

View File

@ -91,15 +91,23 @@ class Proxy(adapter.Adapter):
if conn:
# Per-request setting should take precedence
global_request_id = conn._global_request_id
response = super(Proxy, self).request(
url, method,
connect_retries=connect_retries, raise_exc=raise_exc,
global_request_id=global_request_id,
**kwargs)
for h in response.history:
self._report_stats(h)
self._report_stats(response)
return response
try:
response = super(Proxy, self).request(
url, method,
connect_retries=connect_retries, raise_exc=raise_exc,
global_request_id=global_request_id,
**kwargs)
for h in response.history:
self._report_stats(h)
self._report_stats(response)
return response
except Exception as e:
# If we want metrics to be generated we also need to generate some
# in case of exceptions as well, so that timeouts and connection
# problems (especially when called from ansible) are being
# generated as well.
self._report_stats(None, url, method, e)
raise
def _extract_name(self, url, service_type=None, project_id=None):
'''Produce a key name to use in logging/metrics from the URL path.
@ -185,58 +193,91 @@ class Proxy(adapter.Adapter):
return name_parts
def _report_stats(self, response):
def _report_stats(self, response, url=None, method=None, exc=None):
if self._statsd_client:
self._report_stats_statsd(response)
self._report_stats_statsd(response, url, method, exc)
if self._prometheus_counter and self._prometheus_histogram:
self._report_stats_prometheus(response)
self._report_stats_prometheus(response, url, method, exc)
if self._influxdb_client:
self._report_stats_influxdb(response)
self._report_stats_influxdb(response, url, method, exc)
def _report_stats_statsd(self, response):
name_parts = self._extract_name(response.request.url,
def _report_stats_statsd(self, response, url=None, method=None, exc=None):
if response is not None and not url:
url = response.request.url
if response is not None and not method:
method = response.request.method
name_parts = self._extract_name(url,
self.service_type,
self.session.get_project_id())
key = '.'.join(
[self._statsd_prefix, self.service_type, response.request.method]
[self._statsd_prefix, self.service_type, method]
+ name_parts)
self._statsd_client.timing(key, int(
response.elapsed.microseconds / 1000))
self._statsd_client.incr(key)
if response is not None:
duration = int(response.elapsed.microseconds / 1000)
self._statsd_client.timing(key, duration)
self._statsd_client.incr(key)
elif exc is not None:
self._statsd_client.incr('%s.failed' % key)
def _report_stats_prometheus(self, response):
labels = dict(
method=response.request.method,
endpoint=response.request.url,
service_type=self.service_type,
status_code=response.status_code,
)
self._prometheus_counter.labels(**labels).inc()
self._prometheus_histogram.labels(**labels).observe(
response.elapsed.microseconds / 1000)
def _report_stats_prometheus(self, response, url=None, method=None,
exc=None):
if response is not None and not url:
url = response.request.url
if response is not None and not method:
method = response.request.method
if response is not None:
labels = dict(
method=method,
endpoint=url,
service_type=self.service_type,
status_code=response.status_code,
)
self._prometheus_counter.labels(**labels).inc()
self._prometheus_histogram.labels(**labels).observe(
response.elapsed.microseconds / 1000)
def _report_stats_influxdb(self, response):
def _report_stats_influxdb(self, response, url=None, method=None,
exc=None):
# NOTE(gtema): status_code is saved both as tag and field to give
# ability showing it as a value and not only as a legend.
# However Influx is not ok with having same name in tags and fields,
# therefore use different names.
if response is not None and not url:
url = response.request.url
if response is not None and not method:
method = response.request.method
tags = dict(
method=method,
name='_'.join(self._extract_name(
url, self.service_type,
self.session.get_project_id()))
)
fields = dict(
attempted=1
)
if response is not None:
fields['duration'] = int(response.elapsed.microseconds / 1000)
tags['status_code'] = str(response.status_code)
# Note(gtema): emit also status_code as a value (counter)
fields[str(response.status_code)] = 1
fields['%s.%s' % (method, response.status_code)] = 1
# Note(gtema): status_code field itself is also very helpful on the
# graphs to show what was the code, instead of counting its
# occurences
fields['status_code_val'] = response.status_code
elif exc:
fields['failed'] = 1
if 'additional_metric_tags' in self._influxdb_config:
tags.update(self._influxdb_config['additional_metric_tags'])
measurement = self._influxdb_config.get(
'measurement', 'openstack_api') \
if self._influxdb_config else 'openstack_api'
# Note(gtema) append service name into the measurement name
measurement = '%s.%s' % (measurement, self.service_type)
data = [dict(
measurement=(self._influxdb_config.get('measurement',
'openstack_api')
if self._influxdb_config else 'openstack_api'),
tags=dict(
method=response.request.method,
service_type=self.service_type,
status_code=response.status_code,
name='_'.join(self._extract_name(
response.request.url, self.service_type,
self.session.get_project_id())
)
),
fields=dict(
duration=int(response.elapsed.microseconds / 1000),
status_code_val=int(response.status_code)
)
measurement=measurement,
tags=tags,
fields=fields
)]
try:
self._influxdb_client.write_points(data)