diff --git a/collectd-extensions/src/cpu.py b/collectd-extensions/src/cpu.py index f279ec2..4b702c5 100755 --- a/collectd-extensions/src/cpu.py +++ b/collectd-extensions/src/cpu.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2018-2024 Wind River Systems, Inc. +# Copyright (c) 2018-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -504,7 +504,7 @@ def calculate_occupancy( continue # K8S platform system usage, i.e., essential: kube-system - # check for component label app.starlingx.io/component=platform + # check for component label app.starlingx.io/component=platform if pod.is_platform_resource(): cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += acct cpuwait[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += wait @@ -757,6 +757,8 @@ def update_cpu_data(init=False): since this routine was last run. """ + global obj + # Get epoch time in floating seconds now = time.time() @@ -795,51 +797,7 @@ def update_cpu_data(init=False): # Refresh the k8s pod information if we have discovered new cgroups cg_pods = set(t1_cpuacct[pc.GROUP_PODS].keys()) - if not cg_pods.issubset(obj.k8s_pods): - if obj.debug: - collectd.info('%s Refresh k8s pod information.' % (PLUGIN_DEBUG)) - obj.k8s_pods = set() - try: - pods = obj._k8s_client.kube_get_local_pods() - for i in pods: - # NOTE: parent pod cgroup name contains annotation config.hash as - # part of its name, otherwise it contains the pod uid. - uid = i.metadata.uid - if ((i.metadata.annotations) and - (pc.POD_ANNOTATION_KEY in i.metadata.annotations)): - hash_uid = i.metadata.annotations.get(pc.POD_ANNOTATION_KEY, - None) - if hash_uid: - if obj.debug: - collectd.info('%s POD_ANNOTATION_KEY: ' - 'hash=%s, uid=%s, ' - 'name=%s, namespace=%s, qos_class=%s,' - 'is_platform_label=%s' - % (PLUGIN_DEBUG, - hash_uid, - i.metadata.uid, - i.metadata.name, - i.metadata.namespace, - i.status.qos_class, - i.metadata.labels.get(pc.PLATFORM_LABEL_KEY) == - pc.GROUP_PLATFORM)) - uid = hash_uid - - obj.k8s_pods.add(uid) - if uid not in obj._cache: - obj._cache[uid] = pc.POD_object(i.metadata.uid, - i.metadata.name, - i.metadata.namespace, - i.status.qos_class, - i.metadata.labels) - # Remove stale _cache entries - remove_uids = set(obj._cache.keys()) - obj.k8s_pods - for uid in remove_uids: - del obj._cache[uid] - except ApiException: - # continue with remainder of calculations, keeping cache - collectd.warning('%s encountered kube ApiException' % (PLUGIN)) - pass + obj = pc.pods_monitoring(cg_pods, obj, PLUGIN_DEBUG) # Save initial state information if init: diff --git a/collectd-extensions/src/memory.py b/collectd-extensions/src/memory.py index 114411a..3b957c1 100755 --- a/collectd-extensions/src/memory.py +++ b/collectd-extensions/src/memory.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2018-2024 Wind River Systems, Inc. +# Copyright (c) 2018-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -686,6 +686,8 @@ def init_func(): def read_func(): """collectd memory monitor plugin read function""" + global obj + if obj.init_complete is False: init_func() return 0 @@ -710,52 +712,7 @@ def read_func(): # Refresh the k8s pod information if we have discovered new cgroups cg_pods = set(memory[pc.GROUP_PODS].keys()) - if not cg_pods.issubset(obj.k8s_pods): - if obj.debug: - collectd.info('%s: Refresh k8s pod information.' % (PLUGIN_DEBUG)) - obj.k8s_pods = set() - try: - pods = obj._k8s_client.kube_get_local_pods() - for i in pods: - # NOTE: parent pod cgroup name contains annotation config.hash as - # part of its name, otherwise it contains the pod uid. - uid = i.metadata.uid - if ((i.metadata.annotations) and - (pc.POD_ANNOTATION_KEY in i.metadata.annotations)): - hash_uid = i.metadata.annotations.get(pc.POD_ANNOTATION_KEY, - None) - if hash_uid: - if obj.debug: - collectd.info('%s: POD_ANNOTATION_KEY: ' - 'hash=%s, uid=%s, ' - 'name=%s, namespace=%s, qos_class=%s, ' - 'is_platform_label=%s' - % (PLUGIN_DEBUG, - hash_uid, - i.metadata.uid, - i.metadata.name, - i.metadata.namespace, - i.status.qos_class, - i.metadata.labels.get(pc.PLATFORM_LABEL_KEY) == - pc.GROUP_PLATFORM)) - uid = hash_uid - - obj.k8s_pods.add(uid) - if uid not in obj._cache: - obj._cache[uid] = pc.POD_object(i.metadata.uid, - i.metadata.name, - i.metadata.namespace, - i.status.qos_class, - i.metadata.labels) - - # Remove stale _cache entries - remove_uids = set(obj._cache.keys()) - obj.k8s_pods - for uid in remove_uids: - del obj._cache[uid] - except ApiException: - # continue with remainder of calculations, keeping cache - collectd.warning("memory plugin encountered kube ApiException") - pass + obj = pc.pods_monitoring(cg_pods, obj, PLUGIN_DEBUG) # Summarize memory usage for various groupings for g in pc.OVERALL_GROUPS: diff --git a/collectd-extensions/src/plugin_common.py b/collectd-extensions/src/plugin_common.py index 1bc8f6d..5ad2f7d 100644 --- a/collectd-extensions/src/plugin_common.py +++ b/collectd-extensions/src/plugin_common.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2019-2024 Wind River Systems, Inc. +# Copyright (c) 2019-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -27,6 +27,7 @@ from kubernetes import __version__ as K8S_MODULE_VERSION from kubernetes import client from kubernetes import config from kubernetes.client import Configuration +from kubernetes.client.rest import ApiException import urllib3 @@ -106,10 +107,7 @@ BASE_GROUPS = [CGROUP_INIT, CGROUP_DOCKER, CGROUP_SYSTEM, CGROUP_USER, BASE_GROUPS_EXCLUDE = [CGROUP_K8S, CGROUP_MACHINE] # Groupings of pods by kubernetes namespace -K8S_NAMESPACE_SYSTEM = ['kube-system', 'armada', 'cert-manager', 'portieris', - 'vault', 'notification', 'platform-deployment-manager', - 'flux-helm', 'metrics-server', 'node-feature-discovery', - 'intel-power', 'power-metrics', 'sriov-fec-system'] +K8S_NAMESPACE_SYSTEM = ['kube-system'] K8S_NAMESPACE_ADDON = ['monitor', 'openstack'] PLATFORM_LABEL_KEY = "app.starlingx.io/component" @@ -701,6 +699,16 @@ class K8sClient(object): spec=pod.get('spec'), status=self._as_kube_status(pod.get('status'))) + def _get_namespace_labels(self, namespace_list): + # some namespaces might not have label, so + # return empty dict for them + namespace_labels = {} + for n in namespace_list: + labels = n["metadata"].get("labels", {}) + name = n["metadata"]["name"] + namespace_labels.update({name: labels}) + return namespace_labels + def _as_kube_status(self, status): # status (json) dictionary has the following keys: # 'conditions', 'containerStatuses', 'hostIP', 'phase', @@ -778,14 +786,36 @@ class K8sClient(object): collectd.error("kube_get_local_pods: error=%s" % (str(err))) raise + def get_namespace_labels(self): + # Get namespace labels + try: + kube_results = subprocess.check_output( + ['kubectl', '--kubeconfig', KUBELET_CONF, + 'get', 'namespaces', + '-o', 'json', + ], timeout=K8S_TIMEOUT).decode() + json_results = json.loads(kube_results) + + except subprocess.TimeoutExpired: + collectd.error('kube_get_namespaces: Timeout') + return {} + except json.JSONDecodeError as e: + collectd.error('kube_get_namespaces: Could not parse json output, error=%s' % (str(e))) + return {} + except subprocess.CalledProcessError as e: + collectd.error('kube_get_namespaces: Could not get namespaces, error=%s' % (str(e))) + return {} + + return self._get_namespace_labels(json_results["items"]) + class POD_object: - def __init__(self, uid, name, namespace, qos_class, labels=None): + def __init__(self, uid, name, namespace, qos_class, platform_label=False): self.uid = uid self.name = name self.namespace = namespace self.qos_class = qos_class - self.labels = labels + self.platform_label = platform_label def __str__(self): return str(self.__class__) + ": " + str(self.__dict__) @@ -797,8 +827,7 @@ class POD_object: """Check whether pod contains platform namespace or platform label""" if (self.namespace in K8S_NAMESPACE_SYSTEM - or (self.labels is not None and - self.labels.get(PLATFORM_LABEL_KEY) == GROUP_PLATFORM)): + or self.platform_label): return True return False @@ -909,3 +938,63 @@ def format_range_set(items): s = "%s-%s" % (rng[0][1], rng[-1][1]) ranges.append(s) return ','.join(ranges) + + +def pods_monitoring(cg_pods, obj, PLUGIN_DEBUG): + if not cg_pods.issubset(obj.k8s_pods): + if obj.debug: + collectd.info('%s: Refresh k8s pod information.' % (PLUGIN_DEBUG)) + obj.k8s_pods = set() + try: + namespace_labels = obj._k8s_client.get_namespace_labels() + pods = obj._k8s_client.kube_get_local_pods() + for i in pods: + # NOTE: parent pod cgroup name contains annotation config.hash as + # part of its name, otherwise it contains the pod uid. + uid = i.metadata.uid + + namespace = i.metadata.namespace + platform_label = False + # Check if platform label is present in namespace label or pod label + if namespace_labels[namespace].get(PLATFORM_LABEL_KEY) == GROUP_PLATFORM \ + or i.metadata.labels.get(PLATFORM_LABEL_KEY) == GROUP_PLATFORM: + platform_label = True + + if ((i.metadata.annotations) and + (POD_ANNOTATION_KEY in i.metadata.annotations)): + hash_uid = i.metadata.annotations.get(POD_ANNOTATION_KEY, + None) + if hash_uid: + if obj.debug: + collectd.info('%s: POD_ANNOTATION_KEY: ' + 'hash=%s, uid=%s, ' + 'name=%s, namespace=%s, qos_class=%s, ' + 'is_platform_label=%s' + % (PLUGIN_DEBUG, + hash_uid, + i.metadata.uid, + i.metadata.name, + namespace, + i.status.qos_class, + platform_label)) + uid = hash_uid + + obj.k8s_pods.add(uid) + if uid not in obj._cache: + obj._cache[uid] = POD_object(i.metadata.uid, + i.metadata.name, + namespace, + i.status.qos_class, + platform_label) + + # Remove stale _cache entries + remove_uids = set(obj._cache.keys()) - obj.k8s_pods + for uid in remove_uids: + del obj._cache[uid] + + except ApiException: + # continue with remainder of calculations, keeping cache + collectd.warning('%s encountered kube ApiException' % (PLUGIN_DEBUG)) + pass + + return obj