From e283c3103026a3d134cc86a0c0fd67a108674e31 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Thu, 7 Nov 2024 16:32:06 +0100 Subject: [PATCH] [nodeutilization]: prometheus usage client with prometheus metrics --- README.md | 37 ++- cmd/descheduler/app/options/options.go | 2 + kubernetes/base/rbac.yaml | 22 ++ pkg/api/types.go | 28 ++ pkg/api/v1alpha2/types.go | 28 ++ pkg/api/v1alpha2/zz_generated.conversion.go | 96 +++++++ pkg/api/v1alpha2/zz_generated.deepcopy.go | 67 ++++- pkg/api/zz_generated.deepcopy.go | 67 ++++- pkg/descheduler/client/client.go | 71 +++++ pkg/descheduler/descheduler.go | 257 ++++++++++++++++-- pkg/descheduler/descheduler_test.go | 2 +- pkg/descheduler/policyconfig.go | 38 ++- pkg/descheduler/policyconfig_test.go | 139 +++++++++- pkg/framework/fake/fake.go | 7 + .../nodeutilization/highnodeutilization.go | 2 +- .../nodeutilization/lownodeutilization.go | 52 +++- .../lownodeutilization_test.go | 248 ++++++++++++++++- .../nodeutilization/nodeutilization.go | 47 +++- .../plugins/nodeutilization/types.go | 13 +- .../plugins/nodeutilization/usageclients.go | 124 ++++++++- .../nodeutilization/usageclients_test.go | 164 ++++++++++- .../plugins/nodeutilization/validation.go | 11 + .../nodeutilization/validation_test.go | 63 ++++- .../nodeutilization/zz_generated.deepcopy.go | 23 +- pkg/framework/profile/profile.go | 15 + pkg/framework/types/types.go | 3 + 26 files changed, 1541 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index afb8ce87f..f3aa496d9 100644 --- a/README.md +++ b/README.md @@ -129,14 +129,18 @@ These are top level keys in the Descheduler Policy that you can use to configure | `metricsProviders` | `[]object` | `nil` | Enables various metrics providers like Kubernetes [Metrics Server](https://kubernetes-sigs.github.io/metrics-server/) | | `evictionFailureEventNotification` | `bool` | `false` | Enables eviction failure event notification. | | `gracePeriodSeconds` | `int` | `0` | The duration in seconds before the object should be deleted. The value zero indicates delete immediately. | +| `prometheus` |`object`| `nil` | Configures collection of Prometheus metrics for actual resource utilization | +| `prometheus.url` |`string`| `nil` | Points to a Prometheus server url | +| `prometheus.authToken` |`object`| `nil` | Sets Prometheus server authentication token. If not specified in cluster authentication token from the container's file system is read. | +| `prometheus.authToken.secretReference` |`object`| `nil` | Read the authentication token from a kubernetes secret (the secret is expected to contain the token under `prometheusAuthToken` data key) | +| `prometheus.authToken.secretReference.namespace` |`string`| `nil` | Authentication token kubernetes secret namespace (currently, the RBAC configuration permits retrieving secrets from the `kube-system` namespace. If the secret needs to be accessed from a different namespace, the existing RBAC rules must be explicitly extended. | +| `prometheus.authToken.secretReference.name` |`string`| `nil` | Authentication token kubernetes secret name | The descheduler currently allows to configure a metric collection of Kubernetes Metrics through `metricsProviders` field. -The previous way of setting `metricsCollector` field is deprecated. There is currently one source to configure: -``` -metricsProviders: -- source: KubernetesMetrics -``` -The list can be extended with other metrics providers in the future. +The previous way of setting `metricsCollector` field is deprecated. There are currently two sources to configure: +- `KubernetesMetrics`: enables metrics collection from Kubernetes Metrics server +- `Prometheus`: enables metrics collection from Prometheus server + In general, each plugin can consume metrics from a different provider so multiple distinct providers can be configured in parallel. @@ -174,9 +178,15 @@ maxNoOfPodsToEvictPerNode: 5000 # you don't need to set this, unlimited if not s maxNoOfPodsToEvictPerNamespace: 5000 # you don't need to set this, unlimited if not set maxNoOfPodsToEvictTotal: 5000 # you don't need to set this, unlimited if not set gracePeriodSeconds: 60 # you don't need to set this, 0 if not set -# you don't need to set this, Kubernetes metrics are not collected if not set +# you don't need to set this, metrics are not collected if not set metricsProviders: -- source: KubernetesMetrics +- source: Prometheus + prometheus: + url: http://prometheus-kube-prometheus-prometheus.prom.svc.cluster.local + authToken: + secretReference: + namespace: "kube-system" + name: "authtoken" profiles: - name: ProfileName pluginConfig: @@ -303,6 +313,10 @@ like `kubectl top`) may differ from the calculated consumption, due to these com actual usage metrics. Metrics-based descheduling can be enabled by setting `metricsUtilization.metricsServer` field (deprecated) or `metricsUtilization.source` field to `KubernetesMetrics`. In order to have the plugin consume the metrics the metric provider needs to be configured as well. +Alternatively, it is possible to create a prometheus client and configure a prometheus query to consume +metrics outside of the kubernetes metrics server. The query is expected to return a vector of values for +each node. The values are expected to be any real number within <0; 1> interval. During eviction only +a single pod is evicted at most from each overutilized node. There's currently no support for evicting more. See `metricsProviders` field at [Top Level configuration](#top-level-configuration) for available options. **Parameters:** @@ -318,6 +332,7 @@ See `metricsProviders` field at [Top Level configuration](#top-level-configurati |`metricsUtilization`|object| |`metricsUtilization.metricsServer` (deprecated)|bool| |`metricsUtilization.source`|string| +|`metricsUtilization.prometheus.query`|string| **Example:** @@ -338,8 +353,10 @@ profiles: "cpu" : 50 "memory": 50 "pods": 50 - metricsUtilization: - source: KubernetesMetrics + # metricsUtilization: + # source: Prometheus + # prometheus: + # query: instance:node_cpu:rate:sum evictionLimits: node: 5 plugins: diff --git a/cmd/descheduler/app/options/options.go b/cmd/descheduler/app/options/options.go index 3d84c3158..b5617297c 100644 --- a/cmd/descheduler/app/options/options.go +++ b/cmd/descheduler/app/options/options.go @@ -21,6 +21,7 @@ import ( "strings" "time" + promapi "github.com/prometheus/client_golang/api" "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,6 +55,7 @@ type DeschedulerServer struct { Client clientset.Interface EventClient clientset.Interface MetricsClient metricsclient.Interface + PrometheusClient promapi.Client SecureServing *apiserveroptions.SecureServingOptionsWithLoopback SecureServingInfo *apiserver.SecureServingInfo DisableMetrics bool diff --git a/kubernetes/base/rbac.yaml b/kubernetes/base/rbac.yaml index ab628cb51..cd5bdab8a 100644 --- a/kubernetes/base/rbac.yaml +++ b/kubernetes/base/rbac.yaml @@ -36,6 +36,15 @@ rules: resources: ["nodes", "pods"] verbs: ["get", "list"] --- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: descheduler-role +rules: +- apiGroups: [""] + resources: ["secrets"] + verbs: ["get", "list", "watch"] +--- apiVersion: v1 kind: ServiceAccount metadata: @@ -54,3 +63,16 @@ subjects: - name: descheduler-sa kind: ServiceAccount namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: descheduler-role-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: descheduler-role +subjects: + - name: descheduler-sa + kind: ServiceAccount + namespace: kube-system diff --git a/pkg/api/types.go b/pkg/api/types.go index 43f14bef7..8f4f41d20 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -115,6 +115,9 @@ const ( // KubernetesMetrics enables metrics from a Kubernetes metrics server. // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. KubernetesMetrics MetricsSource = "KubernetesMetrics" + + // KubernetesMetrics enables metrics from a Prometheus metrics server. + PrometheusMetrics MetricsSource = "Prometheus" ) // MetricsCollector configures collection of metrics about actual resource utilization @@ -128,7 +131,32 @@ type MetricsCollector struct { type MetricsProvider struct { // Source enables metrics from Kubernetes metrics server. Source MetricsSource + + // Prometheus enables metrics collection through Prometheus + Prometheus *Prometheus } // ReferencedResourceList is an adaption of v1.ResourceList with resources as references type ReferencedResourceList = map[v1.ResourceName]*resource.Quantity + +type Prometheus struct { + URL string + // authToken used for authentication with the prometheus server. + // If not set the in cluster authentication token for the descheduler service + // account is read from the container's file system. + AuthToken *AuthToken +} + +type AuthToken struct { + // secretReference references an authentication token. + // secrets are expected to be created under the descheduler's namespace. + SecretReference *SecretReference +} + +// SecretReference holds a reference to a Secret +type SecretReference struct { + // namespace is the namespace of the secret. + Namespace string + // name is the name of the secret. + Name string +} diff --git a/pkg/api/v1alpha2/types.go b/pkg/api/v1alpha2/types.go index e72ecf343..f9ae00f72 100644 --- a/pkg/api/v1alpha2/types.go +++ b/pkg/api/v1alpha2/types.go @@ -90,6 +90,9 @@ const ( // KubernetesMetrics enables metrics from a Kubernetes metrics server. // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. KubernetesMetrics MetricsSource = "KubernetesMetrics" + + // KubernetesMetrics enables metrics from a Prometheus metrics server. + PrometheusMetrics MetricsSource = "Prometheus" ) // MetricsCollector configures collection of metrics about actual resource utilization @@ -103,4 +106,29 @@ type MetricsCollector struct { type MetricsProvider struct { // Source enables metrics from Kubernetes metrics server. Source MetricsSource `json:"source,omitempty"` + + // Prometheus enables metrics collection through Prometheus + Prometheus *Prometheus `json:"prometheus,omitempty"` +} + +type Prometheus struct { + URL string `json:"url,omitempty"` + // authToken used for authentication with the prometheus server. + // If not set the in cluster authentication token for the descheduler service + // account is read from the container's file system. + AuthToken *AuthToken `json:"authToken,omitempty"` +} + +type AuthToken struct { + // secretReference references an authentication token. + // secrets are expected to be created under the descheduler's namespace. + SecretReference *SecretReference `json:"secretReference,omitempty"` +} + +// SecretReference holds a reference to a Secret +type SecretReference struct { + // namespace is the namespace of the secret. + Namespace string `json:"namespace,omitempty"` + // name is the name of the secret. + Name string `json:"name,omitempty"` } diff --git a/pkg/api/v1alpha2/zz_generated.conversion.go b/pkg/api/v1alpha2/zz_generated.conversion.go index 01e2c6b85..ad4d7fa7b 100644 --- a/pkg/api/v1alpha2/zz_generated.conversion.go +++ b/pkg/api/v1alpha2/zz_generated.conversion.go @@ -36,6 +36,16 @@ func init() { // RegisterConversions adds conversion functions to the given scheme. // Public to allow building arbitrary schemes. func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*AuthToken)(nil), (*api.AuthToken)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_AuthToken_To_api_AuthToken(a.(*AuthToken), b.(*api.AuthToken), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.AuthToken)(nil), (*AuthToken)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_AuthToken_To_v1alpha2_AuthToken(a.(*api.AuthToken), b.(*AuthToken), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*DeschedulerProfile)(nil), (*api.DeschedulerProfile)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha2_DeschedulerProfile_To_api_DeschedulerProfile(a.(*DeschedulerProfile), b.(*api.DeschedulerProfile), scope) }); err != nil { @@ -91,6 +101,26 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*Prometheus)(nil), (*api.Prometheus)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_Prometheus_To_api_Prometheus(a.(*Prometheus), b.(*api.Prometheus), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.Prometheus)(nil), (*Prometheus)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_Prometheus_To_v1alpha2_Prometheus(a.(*api.Prometheus), b.(*Prometheus), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*SecretReference)(nil), (*api.SecretReference)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_SecretReference_To_api_SecretReference(a.(*SecretReference), b.(*api.SecretReference), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.SecretReference)(nil), (*SecretReference)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_SecretReference_To_v1alpha2_SecretReference(a.(*api.SecretReference), b.(*SecretReference), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*api.DeschedulerPolicy)(nil), (*DeschedulerPolicy)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_api_DeschedulerPolicy_To_v1alpha2_DeschedulerPolicy(a.(*api.DeschedulerPolicy), b.(*DeschedulerPolicy), scope) }); err != nil { @@ -109,6 +139,26 @@ func RegisterConversions(s *runtime.Scheme) error { return nil } +func autoConvert_v1alpha2_AuthToken_To_api_AuthToken(in *AuthToken, out *api.AuthToken, s conversion.Scope) error { + out.SecretReference = (*api.SecretReference)(unsafe.Pointer(in.SecretReference)) + return nil +} + +// Convert_v1alpha2_AuthToken_To_api_AuthToken is an autogenerated conversion function. +func Convert_v1alpha2_AuthToken_To_api_AuthToken(in *AuthToken, out *api.AuthToken, s conversion.Scope) error { + return autoConvert_v1alpha2_AuthToken_To_api_AuthToken(in, out, s) +} + +func autoConvert_api_AuthToken_To_v1alpha2_AuthToken(in *api.AuthToken, out *AuthToken, s conversion.Scope) error { + out.SecretReference = (*SecretReference)(unsafe.Pointer(in.SecretReference)) + return nil +} + +// Convert_api_AuthToken_To_v1alpha2_AuthToken is an autogenerated conversion function. +func Convert_api_AuthToken_To_v1alpha2_AuthToken(in *api.AuthToken, out *AuthToken, s conversion.Scope) error { + return autoConvert_api_AuthToken_To_v1alpha2_AuthToken(in, out, s) +} + func autoConvert_v1alpha2_DeschedulerPolicy_To_api_DeschedulerPolicy(in *DeschedulerPolicy, out *api.DeschedulerPolicy, s conversion.Scope) error { if in.Profiles != nil { in, out := &in.Profiles, &out.Profiles @@ -225,6 +275,7 @@ func Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in *api.MetricsCo func autoConvert_v1alpha2_MetricsProvider_To_api_MetricsProvider(in *MetricsProvider, out *api.MetricsProvider, s conversion.Scope) error { out.Source = api.MetricsSource(in.Source) + out.Prometheus = (*api.Prometheus)(unsafe.Pointer(in.Prometheus)) return nil } @@ -235,6 +286,7 @@ func Convert_v1alpha2_MetricsProvider_To_api_MetricsProvider(in *MetricsProvider func autoConvert_api_MetricsProvider_To_v1alpha2_MetricsProvider(in *api.MetricsProvider, out *MetricsProvider, s conversion.Scope) error { out.Source = MetricsSource(in.Source) + out.Prometheus = (*Prometheus)(unsafe.Pointer(in.Prometheus)) return nil } @@ -339,3 +391,47 @@ func autoConvert_api_Plugins_To_v1alpha2_Plugins(in *api.Plugins, out *Plugins, func Convert_api_Plugins_To_v1alpha2_Plugins(in *api.Plugins, out *Plugins, s conversion.Scope) error { return autoConvert_api_Plugins_To_v1alpha2_Plugins(in, out, s) } + +func autoConvert_v1alpha2_Prometheus_To_api_Prometheus(in *Prometheus, out *api.Prometheus, s conversion.Scope) error { + out.URL = in.URL + out.AuthToken = (*api.AuthToken)(unsafe.Pointer(in.AuthToken)) + return nil +} + +// Convert_v1alpha2_Prometheus_To_api_Prometheus is an autogenerated conversion function. +func Convert_v1alpha2_Prometheus_To_api_Prometheus(in *Prometheus, out *api.Prometheus, s conversion.Scope) error { + return autoConvert_v1alpha2_Prometheus_To_api_Prometheus(in, out, s) +} + +func autoConvert_api_Prometheus_To_v1alpha2_Prometheus(in *api.Prometheus, out *Prometheus, s conversion.Scope) error { + out.URL = in.URL + out.AuthToken = (*AuthToken)(unsafe.Pointer(in.AuthToken)) + return nil +} + +// Convert_api_Prometheus_To_v1alpha2_Prometheus is an autogenerated conversion function. +func Convert_api_Prometheus_To_v1alpha2_Prometheus(in *api.Prometheus, out *Prometheus, s conversion.Scope) error { + return autoConvert_api_Prometheus_To_v1alpha2_Prometheus(in, out, s) +} + +func autoConvert_v1alpha2_SecretReference_To_api_SecretReference(in *SecretReference, out *api.SecretReference, s conversion.Scope) error { + out.Namespace = in.Namespace + out.Name = in.Name + return nil +} + +// Convert_v1alpha2_SecretReference_To_api_SecretReference is an autogenerated conversion function. +func Convert_v1alpha2_SecretReference_To_api_SecretReference(in *SecretReference, out *api.SecretReference, s conversion.Scope) error { + return autoConvert_v1alpha2_SecretReference_To_api_SecretReference(in, out, s) +} + +func autoConvert_api_SecretReference_To_v1alpha2_SecretReference(in *api.SecretReference, out *SecretReference, s conversion.Scope) error { + out.Namespace = in.Namespace + out.Name = in.Name + return nil +} + +// Convert_api_SecretReference_To_v1alpha2_SecretReference is an autogenerated conversion function. +func Convert_api_SecretReference_To_v1alpha2_SecretReference(in *api.SecretReference, out *SecretReference, s conversion.Scope) error { + return autoConvert_api_SecretReference_To_v1alpha2_SecretReference(in, out, s) +} diff --git a/pkg/api/v1alpha2/zz_generated.deepcopy.go b/pkg/api/v1alpha2/zz_generated.deepcopy.go index ab0b94e16..5de5bfd72 100644 --- a/pkg/api/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha2/zz_generated.deepcopy.go @@ -25,6 +25,27 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthToken) DeepCopyInto(out *AuthToken) { + *out = *in + if in.SecretReference != nil { + in, out := &in.SecretReference, &out.SecretReference + *out = new(SecretReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthToken. +func (in *AuthToken) DeepCopy() *AuthToken { + if in == nil { + return nil + } + out := new(AuthToken) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = *in @@ -69,7 +90,9 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { if in.MetricsProviders != nil { in, out := &in.MetricsProviders, &out.MetricsProviders *out = make([]MetricsProvider, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } if in.GracePeriodSeconds != nil { in, out := &in.GracePeriodSeconds, &out.GracePeriodSeconds @@ -140,6 +163,11 @@ func (in *MetricsCollector) DeepCopy() *MetricsCollector { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MetricsProvider) DeepCopyInto(out *MetricsProvider) { *out = *in + if in.Prometheus != nil { + in, out := &in.Prometheus, &out.Prometheus + *out = new(Prometheus) + (*in).DeepCopyInto(*out) + } return } @@ -217,3 +245,40 @@ func (in *Plugins) DeepCopy() *Plugins { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Prometheus) DeepCopyInto(out *Prometheus) { + *out = *in + if in.AuthToken != nil { + in, out := &in.AuthToken, &out.AuthToken + *out = new(AuthToken) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Prometheus. +func (in *Prometheus) DeepCopy() *Prometheus { + if in == nil { + return nil + } + out := new(Prometheus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecretReference) DeepCopyInto(out *SecretReference) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference. +func (in *SecretReference) DeepCopy() *SecretReference { + if in == nil { + return nil + } + out := new(SecretReference) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/api/zz_generated.deepcopy.go b/pkg/api/zz_generated.deepcopy.go index f38b9e124..01c2c31bc 100644 --- a/pkg/api/zz_generated.deepcopy.go +++ b/pkg/api/zz_generated.deepcopy.go @@ -25,6 +25,27 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthToken) DeepCopyInto(out *AuthToken) { + *out = *in + if in.SecretReference != nil { + in, out := &in.SecretReference, &out.SecretReference + *out = new(SecretReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthToken. +func (in *AuthToken) DeepCopy() *AuthToken { + if in == nil { + return nil + } + out := new(AuthToken) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = *in @@ -69,7 +90,9 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { if in.MetricsProviders != nil { in, out := &in.MetricsProviders, &out.MetricsProviders *out = make([]MetricsProvider, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } if in.GracePeriodSeconds != nil { in, out := &in.GracePeriodSeconds, &out.GracePeriodSeconds @@ -161,6 +184,11 @@ func (in *MetricsCollector) DeepCopy() *MetricsCollector { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MetricsProvider) DeepCopyInto(out *MetricsProvider) { *out = *in + if in.Prometheus != nil { + in, out := &in.Prometheus, &out.Prometheus + *out = new(Prometheus) + (*in).DeepCopyInto(*out) + } return } @@ -288,6 +316,27 @@ func (in *PriorityThreshold) DeepCopy() *PriorityThreshold { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Prometheus) DeepCopyInto(out *Prometheus) { + *out = *in + if in.AuthToken != nil { + in, out := &in.AuthToken, &out.AuthToken + *out = new(AuthToken) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Prometheus. +func (in *Prometheus) DeepCopy() *Prometheus { + if in == nil { + return nil + } + out := new(Prometheus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in ResourceThresholds) DeepCopyInto(out *ResourceThresholds) { { @@ -309,3 +358,19 @@ func (in ResourceThresholds) DeepCopy() ResourceThresholds { in.DeepCopyInto(out) return *out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecretReference) DeepCopyInto(out *SecretReference) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference. +func (in *SecretReference) DeepCopy() *SecretReference { + if in == nil { + return nil + } + out := new(SecretReference) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/descheduler/client/client.go b/pkg/descheduler/client/client.go index c5ff5e018..226830f0d 100644 --- a/pkg/descheduler/client/client.go +++ b/pkg/descheduler/client/client.go @@ -17,17 +17,30 @@ limitations under the License. package client import ( + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" + + promapi "github.com/prometheus/client_golang/api" + "github.com/prometheus/common/config" // Ensure to load all auth plugins. clientset "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/transport" componentbaseconfig "k8s.io/component-base/config" metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" ) +var K8sPodCAFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) { var cfg *rest.Config if len(clientConnection.Kubeconfig) != 0 { @@ -94,3 +107,61 @@ func GetMasterFromKubeconfig(filename string) (string, error) { } return "", fmt.Errorf("failed to get master address from kubeconfig: cluster information not found") } + +func loadCAFile(filepath string) (*x509.CertPool, error) { + caCert, err := ioutil.ReadFile(filepath) + if err != nil { + return nil, err + } + + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to append CA certificate to the pool") + } + + return caCertPool, nil +} + +func CreatePrometheusClient(prometheusURL, authToken string) (promapi.Client, *http.Transport, error) { + // Retrieve Pod CA cert + caCertPool, err := loadCAFile(K8sPodCAFilePath) + if err != nil { + return nil, nil, fmt.Errorf("Error loading CA file: %v", err) + } + + // Get Prometheus Host + u, err := url.Parse(prometheusURL) + if err != nil { + return nil, nil, fmt.Errorf("Error parsing prometheus URL: %v", err) + } + t := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + ServerName: u.Host, + }, + } + roundTripper := transport.NewBearerAuthRoundTripper( + authToken, + t, + ) + + if authToken != "" { + client, err := promapi.NewClient(promapi.Config{ + Address: prometheusURL, + RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(authToken), roundTripper), + }) + return client, t, err + } + client, err := promapi.NewClient(promapi.Config{ + Address: prometheusURL, + }) + return client, t, err +} diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 41534d128..7a0979be2 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -20,9 +20,12 @@ import ( "context" "fmt" "math" + "net/http" "strconv" "time" + promapi "github.com/prometheus/client_golang/api" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -30,18 +33,24 @@ import ( policy "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1" schedulingv1 "k8s.io/api/scheduling/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" + "k8s.io/client-go/util/workqueue" componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" @@ -62,6 +71,11 @@ import ( "sigs.k8s.io/descheduler/pkg/version" ) +const ( + prometheusAuthTokenSecretKey = "prometheusAuthToken" + workQueueKey = "key" +) + type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status type profileRunner struct { @@ -70,15 +84,21 @@ type profileRunner struct { } type descheduler struct { - rs *options.DeschedulerServer - ir *informerResources - getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc - sharedInformerFactory informers.SharedInformerFactory - deschedulerPolicy *api.DeschedulerPolicy - eventRecorder events.EventRecorder - podEvictor *evictions.PodEvictor - podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) - metricsCollector *metricscollector.MetricsCollector + rs *options.DeschedulerServer + ir *informerResources + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + sharedInformerFactory informers.SharedInformerFactory + namespacedSecretsLister corev1listers.SecretNamespaceLister + deschedulerPolicy *api.DeschedulerPolicy + eventRecorder events.EventRecorder + podEvictor *evictions.PodEvictor + podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) + metricsCollector *metricscollector.MetricsCollector + prometheusClient promapi.Client + previousPrometheusClientTransport *http.Transport + queue workqueue.RateLimitingInterface + currentPrometheusAuthToken string + metricsProviders map[api.MetricsSource]*api.MetricsProvider } type informerResources struct { @@ -125,8 +145,15 @@ func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFact return nil } -func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory, -) (*descheduler, error) { +func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider { + providersMap := make(map[api.MetricsSource]*api.MetricsProvider) + for _, provider := range providersList { + providersMap[provider.Source] = &provider + } + return providersMap +} + +func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { podInformer := sharedInformerFactory.Core().V1().Pods().Informer() ir := newInformerResources(sharedInformerFactory) @@ -165,20 +192,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu return nil, err } - var metricsCollector *metricscollector.MetricsCollector - if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || (len(deschedulerPolicy.MetricsProviders) > 0 && deschedulerPolicy.MetricsProviders[0].Source == api.KubernetesMetrics) { - nodeSelector := labels.Everything() - if deschedulerPolicy.NodeSelector != nil { - sel, err := labels.Parse(*deschedulerPolicy.NodeSelector) - if err != nil { - return nil, err - } - nodeSelector = sel - } - metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector) - } - - return &descheduler{ + desch := &descheduler{ rs: rs, ir: ir, getPodsAssignedToNode: getPodsAssignedToNode, @@ -187,8 +201,148 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu eventRecorder: eventRecorder, podEvictor: podEvictor, podEvictionReactionFnc: podEvictionReactionFnc, - metricsCollector: metricsCollector, - }, nil + prometheusClient: rs.PrometheusClient, + queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}), + metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders), + } + + if rs.MetricsClient != nil { + nodeSelector := labels.Everything() + if deschedulerPolicy.NodeSelector != nil { + sel, err := labels.Parse(*deschedulerPolicy.NodeSelector) + if err != nil { + return nil, err + } + nodeSelector = sel + } + desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector) + } + + prometheusProvider := desch.metricsProviders[api.PrometheusMetrics] + if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil { + authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference + if authTokenSecret == nil || authTokenSecret.Namespace == "" { + return nil, fmt.Errorf("prometheus metrics source configuration is missing authentication token secret") + } + if namespacedSharedInformerFactory == nil { + return nil, fmt.Errorf("namespacedSharedInformerFactory not configured") + } + namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(desch.eventHandler()) + desch.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace) + } + + return desch, nil +} + +func (d *descheduler) reconcileInClusterSAToken() error { + // Read the sa token and assume it has the sufficient permissions to authenticate + cfg, err := rest.InClusterConfig() + if err == nil { + if d.currentPrometheusAuthToken != cfg.BearerToken { + klog.V(2).Infof("Creating Prometheus client (with SA token)") + prometheusClient, transport, err := client.CreatePrometheusClient(d.metricsProviders[api.PrometheusMetrics].Prometheus.URL, cfg.BearerToken) + if err != nil { + return fmt.Errorf("unable to create a prometheus client: %v", err) + } + d.prometheusClient = prometheusClient + if d.previousPrometheusClientTransport != nil { + d.previousPrometheusClientTransport.CloseIdleConnections() + } + d.previousPrometheusClientTransport = transport + d.currentPrometheusAuthToken = cfg.BearerToken + } + return nil + } + if err == rest.ErrNotInCluster { + return nil + } + return fmt.Errorf("unexpected error when reading in cluster config: %v", err) +} + +func (d *descheduler) runAuthenticationSecretReconciler(ctx context.Context) { + defer utilruntime.HandleCrash() + defer d.queue.ShutDown() + + klog.Infof("Starting authentication secret reconciler") + defer klog.Infof("Shutting down authentication secret reconciler") + + go wait.UntilWithContext(ctx, d.runAuthenticationSecretReconcilerWorker, time.Second) + + <-ctx.Done() +} + +func (d *descheduler) runAuthenticationSecretReconcilerWorker(ctx context.Context) { + for d.processNextWorkItem(ctx) { + } +} + +func (d *descheduler) processNextWorkItem(ctx context.Context) bool { + dsKey, quit := d.queue.Get() + if quit { + return false + } + defer d.queue.Done(dsKey) + + err := d.sync() + if err == nil { + d.queue.Forget(dsKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + d.queue.AddRateLimited(dsKey) + + return true +} + +func (d *descheduler) sync() error { + prometheusConfig := d.metricsProviders[api.PrometheusMetrics].Prometheus + if prometheusConfig == nil || prometheusConfig.AuthToken == nil || prometheusConfig.AuthToken.SecretReference == nil { + return fmt.Errorf("prometheus metrics source configuration is missing authentication token secret") + } + ns := prometheusConfig.AuthToken.SecretReference.Namespace + name := prometheusConfig.AuthToken.SecretReference.Name + secretObj, err := d.namespacedSecretsLister.Get(name) + if err != nil { + // clear the token if the secret is not found + if apierrors.IsNotFound(err) { + d.currentPrometheusAuthToken = "" + if d.previousPrometheusClientTransport != nil { + d.previousPrometheusClientTransport.CloseIdleConnections() + } + d.previousPrometheusClientTransport = nil + d.prometheusClient = nil + } + return fmt.Errorf("unable to get %v/%v secret", ns, name) + } + authToken := string(secretObj.Data[prometheusAuthTokenSecretKey]) + if authToken == "" { + return fmt.Errorf("prometheus authentication token secret missing %q data or empty", prometheusAuthTokenSecretKey) + } + if d.currentPrometheusAuthToken == authToken { + return nil + } + + klog.V(2).Infof("authentication secret token updated, recreating prometheus client") + prometheusClient, transport, err := client.CreatePrometheusClient(prometheusConfig.URL, authToken) + if err != nil { + return fmt.Errorf("unable to create a prometheus client: %v", err) + } + d.prometheusClient = prometheusClient + if d.previousPrometheusClientTransport != nil { + d.previousPrometheusClientTransport.CloseIdleConnections() + } + d.previousPrometheusClientTransport = transport + d.currentPrometheusAuthToken = authToken + return nil +} + +func (d *descheduler) eventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { d.queue.Add(workQueueKey) }, + UpdateFunc: func(old, new interface{}) { d.queue.Add(workQueueKey) }, + DeleteFunc: func(obj interface{}) { d.queue.Add(workQueueKey) }, + } } func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error { @@ -268,6 +422,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), frameworkprofile.WithMetricsCollector(d.metricsCollector), + frameworkprofile.WithPrometheusClient(d.prometheusClient), ) if err != nil { klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) @@ -332,7 +487,7 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error { return err } - if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || (len(deschedulerPolicy.MetricsProviders) > 0 && deschedulerPolicy.MetricsProviders[0].Source == api.KubernetesMetrics) { + if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || metricsProviderListToMap(deschedulerPolicy.MetricsProviders)[api.KubernetesMetrics] != nil { metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler") if err != nil { return err @@ -415,6 +570,14 @@ func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset) func(action cor } } +type tokenReconciliation int + +const ( + noReconciliation tokenReconciliation = iota + inClusterReconciliation + secretReconciliation +) + func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error { var span trace.Span ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies") @@ -436,7 +599,22 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient) defer eventBroadcaster.Shutdown() - descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory) + var namespacedSharedInformerFactory informers.SharedInformerFactory + metricProviderTokenReconciliation := noReconciliation + + prometheusProvider := metricsProviderListToMap(deschedulerPolicy.MetricsProviders)[api.PrometheusMetrics] + if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.URL != "" { + if prometheusProvider.Prometheus.AuthToken != nil { + // Will get reconciled + namespacedSharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields), informers.WithNamespace(prometheusProvider.Prometheus.AuthToken.SecretReference.Namespace)) + metricProviderTokenReconciliation = secretReconciliation + } else { + // Use the sa token and assume it has the sufficient permissions to authenticate + metricProviderTokenReconciliation = inClusterReconciliation + } + } + + descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory, namespacedSharedInformerFactory) if err != nil { span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error()))) return err @@ -445,10 +623,17 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer defer cancel() sharedInformerFactory.Start(ctx.Done()) + if metricProviderTokenReconciliation == secretReconciliation { + namespacedSharedInformerFactory.Start(ctx.Done()) + } + sharedInformerFactory.WaitForCacheSync(ctx.Done()) descheduler.podEvictor.WaitForEventHandlersSync(ctx) + if metricProviderTokenReconciliation == secretReconciliation { + namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done()) + } - if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || (len(deschedulerPolicy.MetricsProviders) > 0 && deschedulerPolicy.MetricsProviders[0].Source == api.KubernetesMetrics) { + if descheduler.metricsCollector != nil { go func() { klog.V(2).Infof("Starting metrics collector") descheduler.metricsCollector.Run(ctx) @@ -462,7 +647,19 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer } } + if metricProviderTokenReconciliation == secretReconciliation { + go descheduler.runAuthenticationSecretReconciler(ctx) + } + wait.NonSlidingUntil(func() { + if metricProviderTokenReconciliation == inClusterReconciliation { + // Read the sa token and assume it has the sufficient permissions to authenticate + if err := descheduler.reconcileInClusterSAToken(); err != nil { + klog.ErrorS(err, "unable to reconcile an in cluster SA token") + return + } + } + // A next context is created here intentionally to avoid nesting the spans via context. sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil") defer sSpan.End() diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index bea4ede85..032658996 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -193,7 +193,7 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) - descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory) + descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory, nil) if err != nil { eventBroadcaster.Shutdown() t.Fatalf("Unable to create a descheduler instance: %v", err) diff --git a/pkg/descheduler/policyconfig.go b/pkg/descheduler/policyconfig.go index 8cdbff66c..ec8443aa8 100644 --- a/pkg/descheduler/policyconfig.go +++ b/pkg/descheduler/policyconfig.go @@ -19,6 +19,7 @@ package descheduler import ( "context" "fmt" + "net/url" "os" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -154,11 +155,42 @@ func validateDeschedulerConfiguration(in api.DeschedulerPolicy, registry pluginr } } } - if len(in.MetricsProviders) > 1 { - errorsInPolicy = append(errorsInPolicy, fmt.Errorf("only a single metrics provider can be set, got %v instead", len(in.MetricsProviders))) + providers := map[api.MetricsSource]api.MetricsProvider{} + for _, provider := range in.MetricsProviders { + if _, ok := providers[provider.Source]; ok { + errorsInPolicy = append(errorsInPolicy, fmt.Errorf("metric provider %q is already configured, each source can be configured only once", provider.Source)) + } else { + providers[provider.Source] = provider + } } - if len(in.MetricsProviders) > 0 && in.MetricsCollector != nil && in.MetricsCollector.Enabled { + if _, exists := providers[api.KubernetesMetrics]; exists && in.MetricsCollector != nil && in.MetricsCollector.Enabled { errorsInPolicy = append(errorsInPolicy, fmt.Errorf("it is not allowed to combine metrics provider when metrics collector is enabled")) } + if prometheusConfig, exists := providers[api.PrometheusMetrics]; exists { + if prometheusConfig.Prometheus == nil { + errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus configuration is required when prometheus source is enabled")) + } else { + if prometheusConfig.Prometheus.URL == "" { + errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus URL is required when prometheus is enabled")) + } else { + u, err := url.Parse(prometheusConfig.Prometheus.URL) + if err != nil { + errorsInPolicy = append(errorsInPolicy, fmt.Errorf("error parsing prometheus URL: %v", err)) + } else if u.Scheme != "https" { + errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus URL's scheme is not https, got %q instead", u.Scheme)) + } + } + + if prometheusConfig.Prometheus.AuthToken != nil { + secretRef := prometheusConfig.Prometheus.AuthToken.SecretReference + if secretRef == nil { + errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus authToken secret is expected to be set when authToken field is")) + } else if secretRef.Name == "" || secretRef.Namespace == "" { + errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus authToken secret reference does not set both namespace and name")) + } + } + } + } + return utilerrors.NewAggregate(errorsInPolicy) } diff --git a/pkg/descheduler/policyconfig_test.go b/pkg/descheduler/policyconfig_test.go index 8199003da..6689dce1f 100644 --- a/pkg/descheduler/policyconfig_test.go +++ b/pkg/descheduler/policyconfig_test.go @@ -212,14 +212,14 @@ func TestValidateDeschedulerConfiguration(t *testing.T) { result: fmt.Errorf("[in profile RemoveFailedPods: only one of Include/Exclude namespaces can be set, in profile RemovePodsViolatingTopologySpreadConstraint: only one of Include/Exclude namespaces can be set]"), }, { - description: "Too many metrics providers error", + description: "Duplicit metrics providers error", deschedulerPolicy: api.DeschedulerPolicy{ MetricsProviders: []api.MetricsProvider{ {Source: api.KubernetesMetrics}, {Source: api.KubernetesMetrics}, }, }, - result: fmt.Errorf("only a single metrics provider can be set, got 2 instead"), + result: fmt.Errorf("metric provider \"KubernetesMetrics\" is already configured, each source can be configured only once"), }, { description: "Too many metrics providers error", @@ -233,12 +233,145 @@ func TestValidateDeschedulerConfiguration(t *testing.T) { }, result: fmt.Errorf("it is not allowed to combine metrics provider when metrics collector is enabled"), }, + { + description: "missing prometheus url error", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{}, + }, + }, + }, + result: fmt.Errorf("prometheus URL is required when prometheus is enabled"), + }, + { + description: "prometheus url is not valid error", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{ + URL: "http://example.com:-80", + }, + }, + }, + }, + result: fmt.Errorf("error parsing prometheus URL: parse \"http://example.com:-80\": invalid port \":-80\" after host"), + }, + { + description: "prometheus url does not have https error", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{ + URL: "http://example.com:80", + }, + }, + }, + }, + result: fmt.Errorf("prometheus URL's scheme is not https, got \"http\" instead"), + }, + { + description: "prometheus authtoken with no secret reference error", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{ + URL: "https://example.com:80", + AuthToken: &api.AuthToken{}, + }, + }, + }, + }, + result: fmt.Errorf("prometheus authToken secret is expected to be set when authToken field is"), + }, + { + description: "prometheus authtoken with empty secret reference error", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{ + URL: "https://example.com:80", + AuthToken: &api.AuthToken{ + SecretReference: &api.SecretReference{}, + }, + }, + }, + }, + }, + result: fmt.Errorf("prometheus authToken secret reference does not set both namespace and name"), + }, + { + description: "prometheus authtoken missing secret reference namespace error", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{ + URL: "https://example.com:80", + AuthToken: &api.AuthToken{ + SecretReference: &api.SecretReference{ + Name: "secretname", + }, + }, + }, + }, + }, + }, + result: fmt.Errorf("prometheus authToken secret reference does not set both namespace and name"), + }, + { + description: "prometheus authtoken missing secret reference name error", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{ + URL: "https://example.com:80", + AuthToken: &api.AuthToken{ + SecretReference: &api.SecretReference{ + Namespace: "secretnamespace", + }, + }, + }, + }, + }, + }, + result: fmt.Errorf("prometheus authToken secret reference does not set both namespace and name"), + }, + { + description: "valid prometheus authtoken secret reference", + deschedulerPolicy: api.DeschedulerPolicy{ + MetricsProviders: []api.MetricsProvider{ + { + Source: api.PrometheusMetrics, + Prometheus: &api.Prometheus{ + URL: "https://example.com:80", + AuthToken: &api.AuthToken{ + SecretReference: &api.SecretReference{ + Name: "secretname", + Namespace: "secretnamespace", + }, + }, + }, + }, + }, + }, + }, } for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { result := validateDeschedulerConfiguration(tc.deschedulerPolicy, pluginregistry.PluginRegistry) - if result.Error() != tc.result.Error() { + if result == nil && tc.result != nil || result != nil && tc.result == nil { + t.Errorf("test '%s' failed. expected \n'%s', got \n'%s'", tc.description, tc.result, result) + } else if result == nil && tc.result == nil { + return + } else if result.Error() != tc.result.Error() { t.Errorf("test '%s' failed. expected \n'%s', got \n'%s'", tc.description, tc.result, result) } }) diff --git a/pkg/framework/fake/fake.go b/pkg/framework/fake/fake.go index d27422893..681a9275b 100644 --- a/pkg/framework/fake/fake.go +++ b/pkg/framework/fake/fake.go @@ -11,6 +11,8 @@ import ( "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + + promapi "github.com/prometheus/client_golang/api" ) type HandleImpl struct { @@ -20,6 +22,7 @@ type HandleImpl struct { EvictorFilterImpl frameworktypes.EvictorPlugin PodEvictorImpl *evictions.PodEvictor MetricsCollectorImpl *metricscollector.MetricsCollector + PrometheusClientImpl promapi.Client } var _ frameworktypes.Handle = &HandleImpl{} @@ -28,6 +31,10 @@ func (hi *HandleImpl) ClientSet() clientset.Interface { return hi.ClientsetImpl } +func (hi *HandleImpl) PrometheusClient() promapi.Client { + return hi.PrometheusClientImpl +} + func (hi *HandleImpl) MetricsCollector() *metricscollector.MetricsCollector { return hi.MetricsCollectorImpl } diff --git a/pkg/framework/plugins/nodeutilization/highnodeutilization.go b/pkg/framework/plugins/nodeutilization/highnodeutilization.go index 69bddb0c8..e9e412150 100644 --- a/pkg/framework/plugins/nodeutilization/highnodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/highnodeutilization.go @@ -95,7 +95,7 @@ func (h *HighNodeUtilization) Name() string { // Balance extension point implementation for the plugin func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { - if err := h.usageClient.sync(nodes); err != nil { + if err := h.usageClient.sync(ctx, nodes); err != nil { return &frameworktypes.Status{ Err: fmt.Errorf("error getting node usage: %v", err), } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index 786a08bf6..970aea3ca 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -55,7 +55,23 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f return nil, fmt.Errorf("want args to be of type LowNodeUtilizationArgs, got %T", args) } - setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds) + metricsUtilization := lowNodeUtilizationArgsArgs.MetricsUtilization + if metricsUtilization != nil && metricsUtilization.Source == api.PrometheusMetrics { + if metricsUtilization.Prometheus != nil && metricsUtilization.Prometheus.Query != "" { + uResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds) + oResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.TargetThresholds) + if len(uResourceNames) != 1 || uResourceNames[0] != MetricResource { + return nil, fmt.Errorf("thresholds are expected to specify a single instance of %q resource, got %v instead", MetricResource, uResourceNames) + } + if len(oResourceNames) != 1 || oResourceNames[0] != MetricResource { + return nil, fmt.Errorf("targetThresholds are expected to specify a single instance of %q resource, got %v instead", MetricResource, oResourceNames) + } + } else { + return nil, fmt.Errorf("prometheus query is missing") + } + } else { + setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds) + } underutilizationCriteria := []interface{}{ "CPU", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourceCPU], @@ -90,11 +106,23 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f var usageClient usageClient // MetricsServer is deprecated, removed once dropped - if lowNodeUtilizationArgsArgs.MetricsUtilization != nil && (lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer || lowNodeUtilizationArgsArgs.MetricsUtilization.Source == api.KubernetesMetrics) { - if handle.MetricsCollector() == nil { - return nil, fmt.Errorf("metrics client not initialized") + if metricsUtilization != nil { + switch { + case metricsUtilization.MetricsServer, metricsUtilization.Source == api.KubernetesMetrics: + if handle.MetricsCollector() == nil { + return nil, fmt.Errorf("metrics client not initialized") + } + usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) + case metricsUtilization.Source == api.PrometheusMetrics: + if handle.PrometheusClient() == nil { + return nil, fmt.Errorf("prometheus client not initialized") + } + usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), handle.PrometheusClient(), metricsUtilization.Prometheus.Query) + case metricsUtilization.Source != "": + return nil, fmt.Errorf("unrecognized metrics source") + default: + return nil, fmt.Errorf("metrics source is empty") } - usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) } else { usageClient = newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()) } @@ -117,7 +145,7 @@ func (l *LowNodeUtilization) Name() string { // Balance extension point implementation for the plugin func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { - if err := l.usageClient.sync(nodes); err != nil { + if err := l.usageClient.sync(ctx, nodes); err != nil { return &frameworktypes.Status{ Err: fmt.Errorf("error getting node usage: %v", err), } @@ -126,12 +154,20 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, l.usageClient) var nodeThresholdsMap map[string][]api.ResourceThresholds if l.args.UseDeviationThresholds { - nodeThresholdsMap = getNodeThresholdsFromAverageNodeUsage(nodes, l.usageClient, l.args.Thresholds, l.args.TargetThresholds) + thresholds, average := getNodeThresholdsFromAverageNodeUsage(nodes, l.usageClient, l.args.Thresholds, l.args.TargetThresholds) + klog.InfoS("Average utilization through all nodes", "utilization", average) + // All nodes are expected to have the same thresholds + for nodeName := range thresholds { + klog.InfoS("Underutilization threshold based on average utilization", "threshold", thresholds[nodeName][0]) + klog.InfoS("Overutilization threshold based on average utilization", "threshold", thresholds[nodeName][1]) + break + } + nodeThresholdsMap = thresholds } else { nodeThresholdsMap = getStaticNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds) } - nodesUsageAsNodeThresholdsMap := nodeUsageToResourceThresholds(nodesUsageMap, nodesMap) + nodesUsageAsNodeThresholdsMap := nodeUsageToResourceThresholds(nodesUsageMap, nodesMap) nodeGroups := classifyNodeUsage( nodesUsageAsNodeThresholdsMap, nodeThresholdsMap, diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go index a97022365..16a609a38 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go @@ -41,6 +41,8 @@ import ( frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" "sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/test" + + "github.com/prometheus/common/model" ) func TestLowNodeUtilization(t *testing.T) { @@ -1257,9 +1259,9 @@ func TestLowNodeUtilization(t *testing.T) { } handle.MetricsCollectorImpl = collector - var metricsSource api.MetricsSource = "" + var metricsUtilization *MetricsUtilization if metricsEnabled { - metricsSource = api.KubernetesMetrics + metricsUtilization = &MetricsUtilization{Source: api.KubernetesMetrics} } plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ @@ -1268,9 +1270,7 @@ func TestLowNodeUtilization(t *testing.T) { UseDeviationThresholds: tc.useDeviationThresholds, EvictionLimits: tc.evictionLimits, EvictableNamespaces: tc.evictableNamespaces, - MetricsUtilization: &MetricsUtilization{ - Source: metricsSource, - }, + MetricsUtilization: metricsUtilization, }, handle) if err != nil { @@ -1444,3 +1444,241 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) { }) } } + +func withLocalStorage(pod *v1.Pod) { + // A pod with local storage. + test.SetNormalOwnerRef(pod) + pod.Spec.Volumes = []v1.Volume{ + { + Name: "sample", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{Path: "somePath"}, + EmptyDir: &v1.EmptyDirVolumeSource{ + SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI), + }, + }, + }, + } + // A Mirror Pod. + pod.Annotations = test.GetMirrorPodAnnotation() +} + +func withCriticalPod(pod *v1.Pod) { + // A Critical Pod. + test.SetNormalOwnerRef(pod) + pod.Namespace = "kube-system" + priority := utils.SystemCriticalPriority + pod.Spec.Priority = &priority +} + +func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) { + n1NodeName := "n1" + n2NodeName := "n2" + n3NodeName := "n3" + + testCases := []struct { + name string + samples model.Vector + nodes []*v1.Node + pods []*v1.Pod + expectedPodsEvicted uint + evictedPods []string + args *LowNodeUtilizationArgs + }{ + { + name: "with instance:node_cpu:rate:sum query", + args: &LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + MetricResource: 30, + }, + TargetThresholds: api.ResourceThresholds{ + MetricResource: 50, + }, + MetricsUtilization: &MetricsUtilization{ + Source: api.PrometheusMetrics, + Prometheus: &Prometheus{ + Query: "instance:node_cpu:rate:sum", + }, + }, + }, + samples: model.Vector{ + sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561), + sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522), + sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104), + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil), + test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage), + test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), + }, + expectedPodsEvicted: 1, + }, + { + name: "with instance:node_cpu:rate:sum query with more evictions", + args: &LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + MetricResource: 30, + }, + TargetThresholds: api.ResourceThresholds{ + MetricResource: 50, + }, + EvictionLimits: &api.EvictionLimits{ + Node: ptr.To[uint](3), + }, + MetricsUtilization: &MetricsUtilization{ + Source: api.PrometheusMetrics, + Prometheus: &Prometheus{ + Query: "instance:node_cpu:rate:sum", + }, + }, + }, + samples: model.Vector{ + sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561), + sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522), + sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104), + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil), + test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage), + test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), + }, + expectedPodsEvicted: 3, + }, + { + name: "with instance:node_cpu:rate:sum query with deviation", + args: &LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + MetricResource: 5, + }, + TargetThresholds: api.ResourceThresholds{ + MetricResource: 5, + }, + EvictionLimits: &api.EvictionLimits{ + Node: ptr.To[uint](2), + }, + UseDeviationThresholds: true, + MetricsUtilization: &MetricsUtilization{ + Source: api.PrometheusMetrics, + Prometheus: &Prometheus{ + Query: "instance:node_cpu:rate:sum", + }, + }, + }, + samples: model.Vector{ + sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561), + sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522), + sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104), + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil), + test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage), + test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), + }, + expectedPodsEvicted: 2, + }, + } + + for _, tc := range testCases { + testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var objs []runtime.Object + for _, node := range tc.nodes { + objs = append(objs, node) + } + for _, pod := range tc.pods { + objs = append(objs, pod) + } + + fakeClient := fake.NewSimpleClientset(objs...) + + podsForEviction := make(map[string]struct{}) + for _, pod := range tc.evictedPods { + podsForEviction[pod] = struct{}{} + } + + evictionFailed := false + if len(tc.evictedPods) > 0 { + fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.CreateAction) + obj := getAction.GetObject() + if eviction, ok := obj.(*policy.Eviction); ok { + if _, exists := podsForEviction[eviction.Name]; exists { + return true, obj, nil + } + evictionFailed = true + return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) + } + return true, obj, nil + }) + } + + handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) + if err != nil { + t.Fatalf("Unable to initialize a framework handle: %v", err) + } + + handle.PrometheusClientImpl = &fakePromClient{ + result: tc.samples, + dataType: model.ValVector, + } + plugin, err := NewLowNodeUtilization(tc.args, handle) + if err != nil { + t.Fatalf("Unable to initialize the plugin: %v", err) + } + + status := plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) + if status != nil { + t.Fatalf("Balance.err: %v", status.Err) + } + + podsEvicted := podEvictor.TotalEvicted() + if expectedPodsEvicted != podsEvicted { + t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted) + } + if evictionFailed { + t.Errorf("Pod evictions failed unexpectedly") + } + } + } + t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted)) + } +} diff --git a/pkg/framework/plugins/nodeutilization/nodeutilization.go b/pkg/framework/plugins/nodeutilization/nodeutilization.go index c3dd25c8a..c0d589688 100644 --- a/pkg/framework/plugins/nodeutilization/nodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/nodeutilization.go @@ -57,6 +57,7 @@ import ( // - thresholds: map[string][]api.ReferencedResourceList // - pod list: map[string][]*v1.Pod // Once the nodes are classified produce the original []NodeInfo so the code is not that much changed (postponing further refactoring once it is needed) +const MetricResource = v1.ResourceName("MetricResource") // NodeUsage stores a node's info, pods on it, thresholds and its resource usage type NodeUsage struct { @@ -94,20 +95,30 @@ func normalizePercentage(percent api.Percentage) api.Percentage { return percent } +func nodeCapacity(node *v1.Node, nodeUsage api.ReferencedResourceList) v1.ResourceList { + capacity := node.Status.Capacity + if len(node.Status.Allocatable) > 0 { + capacity = node.Status.Allocatable + } + // the usage captures the metrics resource + if _, ok := nodeUsage[MetricResource]; ok { + // Make ResourceMetrics 100% => 100 points + capacity[MetricResource] = *resource.NewQuantity(int64(100), resource.DecimalSI) + } + return capacity +} + func getNodeThresholdsFromAverageNodeUsage( nodes []*v1.Node, usageClient usageClient, lowSpan, highSpan api.ResourceThresholds, -) map[string][]api.ResourceThresholds { +) (map[string][]api.ResourceThresholds, api.ResourceThresholds) { total := api.ResourceThresholds{} average := api.ResourceThresholds{} numberOfNodes := len(nodes) for _, node := range nodes { usage := usageClient.nodeUtilization(node.Name) - nodeCapacity := node.Status.Capacity - if len(node.Status.Allocatable) > 0 { - nodeCapacity = node.Status.Allocatable - } + nodeCapacity := nodeCapacity(node, usage) for resource, value := range usage { nodeCapacityValue := nodeCapacity[resource] if resource == v1.ResourceCPU { @@ -138,7 +149,7 @@ func getNodeThresholdsFromAverageNodeUsage( highThreshold, } } - return nodeThresholds + return nodeThresholds, average } func getStaticNodeThresholds( @@ -216,10 +227,7 @@ func roundTo2Decimals(percentage float64) float64 { } func resourceUsagePercentages(nodeUsage api.ReferencedResourceList, node *v1.Node, round bool) api.ResourceThresholds { - nodeCapacity := node.Status.Capacity - if len(node.Status.Allocatable) > 0 { - nodeCapacity = node.Status.Allocatable - } + nodeCapacity := nodeCapacity(node, nodeUsage) resourceUsagePercentage := api.ResourceThresholds{} for resourceName, resourceUsage := range nodeUsage { @@ -395,16 +403,29 @@ func evictPods( if !preEvictionFilterWithOptions(pod) { continue } + + // In case podUsage does not support resource counting (e.g. provided metric + // does not quantify pod resource utilization). + unconstrainedResourceEviction := false podUsage, err := usageClient.podUsage(pod) if err != nil { - klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) - continue + if _, ok := err.(*notSupportedError); !ok { + klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) + continue + } + unconstrainedResourceEviction = true } err = podEvictor.Evict(ctx, pod, evictOptions) if err == nil { + if maxNoOfPodsToEvictPerNode == nil && unconstrainedResourceEviction { + klog.V(3).InfoS("Currently, only a single pod eviction is allowed") + break + } evictionCounter++ klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod)) - + if unconstrainedResourceEviction { + continue + } for name := range totalAvailableUsage { if name == v1.ResourcePods { nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) diff --git a/pkg/framework/plugins/nodeutilization/types.go b/pkg/framework/plugins/nodeutilization/types.go index 23e5a3ebf..e49434d9d 100644 --- a/pkg/framework/plugins/nodeutilization/types.go +++ b/pkg/framework/plugins/nodeutilization/types.go @@ -55,13 +55,24 @@ type HighNodeUtilizationArgs struct { } // MetricsUtilization allow to consume actual resource utilization from metrics +// +k8s:deepcopy-gen=true type MetricsUtilization struct { // metricsServer enables metrics from a kubernetes metrics server. // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. - // Deprecated. Use MetricsSource instead. + // Deprecated. Use Source instead. MetricsServer bool `json:"metricsServer,omitempty"` // source enables the plugin to consume metrics from a metrics source. // Currently only KubernetesMetrics available. Source api.MetricsSource `json:"source,omitempty"` + + // prometheus enables metrics collection through a prometheus query. + Prometheus *Prometheus `json:"prometheus,omitempty"` +} + +type Prometheus struct { + // query returning a vector of samples, each sample labeled with `instance` + // corresponding to a node name with each sample value as a real number + // in <0; 1> interval. + Query string `json:"query,omitempty"` } diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 44191b11c..caebca3c9 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -19,7 +19,11 @@ package nodeutilization import ( "context" "fmt" + "time" + promapi "github.com/prometheus/client_golang/api" + promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,11 +37,33 @@ import ( "sigs.k8s.io/descheduler/pkg/utils" ) +type UsageClientType int + +const ( + requestedUsageClientType UsageClientType = iota + actualUsageClientType + prometheusUsageClientType +) + +type notSupportedError struct { + usageClientType UsageClientType +} + +func (e notSupportedError) Error() string { + return "maximum number of evicted pods per node reached" +} + +func newNotSupportedError(usageClientType UsageClientType) *notSupportedError { + return ¬SupportedError{ + usageClientType: usageClientType, + } +} + type usageClient interface { // Both low/high node utilization plugins are expected to invoke sync right // after Balance method is invoked. There's no cache invalidation so each // Balance is expected to get the latest data by invoking sync. - sync(nodes []*v1.Node) error + sync(ctx context.Context, nodes []*v1.Node) error nodeUtilization(node string) api.ReferencedResourceList pods(node string) []*v1.Pod podUsage(pod *v1.Pod) (api.ReferencedResourceList, error) @@ -79,7 +105,7 @@ func (s *requestedUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceList return usage, nil } -func (s *requestedUsageClient) sync(nodes []*v1.Node) error { +func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) error { s._nodeUtilization = make(map[string]api.ReferencedResourceList) s._pods = make(map[string][]*v1.Pod) @@ -165,7 +191,7 @@ func (client *actualUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceLi return totalUsage, nil } -func (client *actualUsageClient) sync(nodes []*v1.Node) error { +func (client *actualUsageClient) sync(ctx context.Context, nodes []*v1.Node) error { client._nodeUtilization = make(map[string]api.ReferencedResourceList) client._pods = make(map[string][]*v1.Pod) @@ -200,3 +226,95 @@ func (client *actualUsageClient) sync(nodes []*v1.Node) error { return nil } + +type prometheusUsageClient struct { + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + promClient promapi.Client + promQuery string + + _pods map[string][]*v1.Pod + _nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity +} + +var _ usageClient = &actualUsageClient{} + +func newPrometheusUsageClient( + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, + promClient promapi.Client, + promQuery string, +) *prometheusUsageClient { + return &prometheusUsageClient{ + getPodsAssignedToNode: getPodsAssignedToNode, + promClient: promClient, + promQuery: promQuery, + } +} + +func (client *prometheusUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity { + return client._nodeUtilization[node] +} + +func (client *prometheusUsageClient) pods(node string) []*v1.Pod { + return client._pods[node] +} + +func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + return nil, newNotSupportedError(prometheusUsageClientType) +} + +func NodeUsageFromPrometheusMetrics(ctx context.Context, promClient promapi.Client, promQuery string) (map[string]map[v1.ResourceName]*resource.Quantity, error) { + results, warnings, err := promv1.NewAPI(promClient).Query(ctx, promQuery, time.Now()) + if err != nil { + return nil, fmt.Errorf("unable to capture prometheus metrics: %v", err) + } + if len(warnings) > 0 { + klog.Infof("prometheus metrics warnings: %v", warnings) + } + + if results.Type() != model.ValVector { + return nil, fmt.Errorf("expected query results to be of type %q, got %q instead", model.ValVector, results.Type()) + } + + nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity) + for _, sample := range results.(model.Vector) { + nodeName, exists := sample.Metric["instance"] + if !exists { + return nil, fmt.Errorf("The collected metrics sample is missing 'instance' key") + } + if sample.Value < 0 || sample.Value > 1 { + return nil, fmt.Errorf("The collected metrics sample for %q has value %v outside of <0; 1> interval", string(nodeName), sample.Value) + } + nodeUsages[string(nodeName)] = map[v1.ResourceName]*resource.Quantity{ + MetricResource: resource.NewQuantity(int64(sample.Value*100), resource.DecimalSI), + } + } + + return nodeUsages, nil +} + +func (client *prometheusUsageClient) sync(ctx context.Context, nodes []*v1.Node) error { + client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) + client._pods = make(map[string][]*v1.Pod) + + nodeUsages, err := NodeUsageFromPrometheusMetrics(ctx, client.promClient, client.promQuery) + if err != nil { + return err + } + + for _, node := range nodes { + if _, exists := nodeUsages[node.Name]; !exists { + return fmt.Errorf("unable to find metric entry for %v", node.Name) + } + pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) + if err != nil { + klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) + return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + } + + // store the snapshot of pods from the same (or the closest) node utilization computation + client._pods[node.Name] = pods + client._nodeUtilization[node.Name] = nodeUsages[node.Name] + } + + return nil +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go index ac1886216..8f47e0b0c 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients_test.go +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -18,9 +18,14 @@ package nodeutilization import ( "context" + "encoding/json" "fmt" + "net/http" + "net/url" "testing" + "github.com/prometheus/common/model" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" @@ -58,9 +63,9 @@ func updateMetricsAndCheckNodeUtilization( if err != nil { t.Fatalf("failed to capture metrics: %v", err) } - err = usageClient.sync(nodes) + err = usageClient.sync(ctx, nodes) if err != nil { - t.Fatalf("failed to capture a snapshot: %v", err) + t.Fatalf("failed to sync a snapshot: %v", err) } nodeUtilization := usageClient.nodeUtilization(nodeName) t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue()) @@ -137,3 +142,158 @@ func TestActualUsageClient(t *testing.T) { metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, ) } + +type fakePromClient struct { + result interface{} + dataType model.ValueType +} + +type fakePayload struct { + Status string `json:"status"` + Data queryResult `json:"data"` +} + +type queryResult struct { + Type model.ValueType `json:"resultType"` + Result interface{} `json:"result"` +} + +func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL { + return &url.URL{} +} + +func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) { + jsonData, err := json.Marshal(fakePayload{ + Status: "success", + Data: queryResult{ + Type: client.dataType, + Result: client.result, + }, + }) + + return &http.Response{StatusCode: 200}, jsonData, err +} + +func sample(metricName, nodeName string, value float64) *model.Sample { + return &model.Sample{ + Metric: model.Metric{ + "__name__": model.LabelValue(metricName), + "instance": model.LabelValue(nodeName), + }, + Value: model.SampleValue(value), + Timestamp: 1728991761711, + } +} + +func TestPrometheusUsageClient(t *testing.T) { + n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil) + + nodes := []*v1.Node{n1, n2, n3} + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil) + p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + tests := []struct { + name string + result interface{} + dataType model.ValueType + nodeUsage map[string]int64 + err error + }{ + { + name: "valid data", + dataType: model.ValVector, + result: model.Vector{ + sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104), + sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522), + sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561), + }, + nodeUsage: map[string]int64{ + "ip-10-0-51-101.ec2.internal": 20, + "ip-10-0-17-165.ec2.internal": 42, + "ip-10-0-94-25.ec2.internal": 56, + }, + }, + { + name: "invalid data missing instance label", + dataType: model.ValVector, + result: model.Vector{ + &model.Sample{ + Metric: model.Metric{ + "__name__": model.LabelValue("instance:node_cpu:rate:sum"), + }, + Value: model.SampleValue(0.20381818181818104), + Timestamp: 1728991761711, + }, + }, + err: fmt.Errorf("The collected metrics sample is missing 'instance' key"), + }, + { + name: "invalid data value out of range", + dataType: model.ValVector, + result: model.Vector{ + sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 1.20381818181818104), + }, + err: fmt.Errorf("The collected metrics sample for \"ip-10-0-51-101.ec2.internal\" has value 1.203818181818181 outside of <0; 1> interval"), + }, + { + name: "invalid data not a vector", + dataType: model.ValScalar, + result: model.Scalar{ + Value: model.SampleValue(0.20381818181818104), + Timestamp: 1728991761711, + }, + err: fmt.Errorf("expected query results to be of type \"vector\", got \"scalar\" instead"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + pClient := &fakePromClient{ + result: tc.result, + dataType: tc.dataType, + } + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3) + + ctx := context.TODO() + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum") + err = prometheusUsageClient.sync(ctx, nodes) + if tc.err == nil { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } else { + if err == nil { + t.Fatalf("unexpected %q error, got nil instead", tc.err) + } else if err.Error() != tc.err.Error() { + t.Fatalf("expected %q error, got %q instead", tc.err, err) + } + return + } + + for _, node := range nodes { + nodeUtil := prometheusUsageClient.nodeUtilization(node.Name) + if nodeUtil[MetricResource].Value() != tc.nodeUsage[node.Name] { + t.Fatalf("expected %q node utilization to be %v, got %v instead", node.Name, tc.nodeUsage[node.Name], nodeUtil[MetricResource]) + } else { + t.Logf("%v node utilization: %v", node.Name, nodeUtil[MetricResource]) + } + } + }) + } +} diff --git a/pkg/framework/plugins/nodeutilization/validation.go b/pkg/framework/plugins/nodeutilization/validation.go index 5df502483..1392dda26 100644 --- a/pkg/framework/plugins/nodeutilization/validation.go +++ b/pkg/framework/plugins/nodeutilization/validation.go @@ -44,6 +44,17 @@ func ValidateLowNodeUtilizationArgs(obj runtime.Object) error { if err != nil { return err } + if args.MetricsUtilization != nil { + if args.MetricsUtilization.Source == api.KubernetesMetrics && args.MetricsUtilization.MetricsServer { + return fmt.Errorf("it is not allowed to set both %q source and metricsServer", api.KubernetesMetrics) + } + if args.MetricsUtilization.Source == api.KubernetesMetrics && args.MetricsUtilization.Prometheus != nil { + return fmt.Errorf("prometheus configuration is not allowed to set when source is set to %q", api.KubernetesMetrics) + } + if args.MetricsUtilization.Source == api.PrometheusMetrics && (args.MetricsUtilization.Prometheus == nil || args.MetricsUtilization.Prometheus.Query == "") { + return fmt.Errorf("prometheus query is required when metrics source is set to %q", api.PrometheusMetrics) + } + } return nil } diff --git a/pkg/framework/plugins/nodeutilization/validation_test.go b/pkg/framework/plugins/nodeutilization/validation_test.go index b9362d33b..f695a1584 100644 --- a/pkg/framework/plugins/nodeutilization/validation_test.go +++ b/pkg/framework/plugins/nodeutilization/validation_test.go @@ -183,6 +183,65 @@ func TestValidateLowNodeUtilizationPluginConfig(t *testing.T) { }, errInfo: nil, }, + { + name: "setting both kubernetes metrics source and metricsserver", + args: &LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 20, + v1.ResourceMemory: 20, + extendedResource: 20, + }, + TargetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 80, + v1.ResourceMemory: 80, + extendedResource: 80, + }, + MetricsUtilization: &MetricsUtilization{ + MetricsServer: true, + Source: api.KubernetesMetrics, + }, + }, + errInfo: fmt.Errorf("it is not allowed to set both \"KubernetesMetrics\" source and metricsServer"), + }, + { + name: "missing prometheus query", + args: &LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 20, + v1.ResourceMemory: 20, + extendedResource: 20, + }, + TargetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 80, + v1.ResourceMemory: 80, + extendedResource: 80, + }, + MetricsUtilization: &MetricsUtilization{ + Source: api.PrometheusMetrics, + }, + }, + errInfo: fmt.Errorf("prometheus query is required when metrics source is set to \"Prometheus\""), + }, + { + name: "prometheus set when source set to kubernetes metrics", + args: &LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 20, + v1.ResourceMemory: 20, + extendedResource: 20, + }, + TargetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 80, + v1.ResourceMemory: 80, + extendedResource: 80, + }, + MetricsUtilization: &MetricsUtilization{ + Source: api.KubernetesMetrics, + Prometheus: &Prometheus{}, + }, + }, + errInfo: fmt.Errorf("prometheus configuration is not allowed to set when source is set to \"KubernetesMetrics\""), + }, } for _, testCase := range tests { @@ -190,10 +249,10 @@ func TestValidateLowNodeUtilizationPluginConfig(t *testing.T) { validateErr := ValidateLowNodeUtilizationArgs(runtime.Object(testCase.args)) if validateErr == nil || testCase.errInfo == nil { if validateErr != testCase.errInfo { - t.Errorf("expected validity of plugin config: %v but got %v instead", testCase.errInfo, validateErr) + t.Errorf("expected validity of plugin config: %q but got %q instead", testCase.errInfo, validateErr) } } else if validateErr.Error() != testCase.errInfo.Error() { - t.Errorf("expected validity of plugin config: %v but got %v instead", testCase.errInfo, validateErr) + t.Errorf("expected validity of plugin config: %q but got %q instead", testCase.errInfo, validateErr) } }) } diff --git a/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go b/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go index 36b90743c..9a1d9c5c7 100644 --- a/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go +++ b/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go @@ -84,7 +84,7 @@ func (in *LowNodeUtilizationArgs) DeepCopyInto(out *LowNodeUtilizationArgs) { if in.MetricsUtilization != nil { in, out := &in.MetricsUtilization, &out.MetricsUtilization *out = new(MetricsUtilization) - **out = **in + (*in).DeepCopyInto(*out) } if in.EvictableNamespaces != nil { in, out := &in.EvictableNamespaces, &out.EvictableNamespaces @@ -116,3 +116,24 @@ func (in *LowNodeUtilizationArgs) DeepCopyObject() runtime.Object { } return nil } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetricsUtilization) DeepCopyInto(out *MetricsUtilization) { + *out = *in + if in.Prometheus != nil { + in, out := &in.Prometheus, &out.Prometheus + *out = new(Prometheus) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsUtilization. +func (in *MetricsUtilization) DeepCopy() *MetricsUtilization { + if in == nil { + return nil + } + out := new(MetricsUtilization) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index a63613fda..16d81d738 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -18,6 +18,7 @@ import ( "fmt" "time" + promapi "github.com/prometheus/client_golang/api" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -68,6 +69,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev // handleImpl implements the framework handle which gets passed to plugins type handleImpl struct { clientSet clientset.Interface + prometheusClient promapi.Client metricsCollector *metricscollector.MetricsCollector getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc sharedInformerFactory informers.SharedInformerFactory @@ -81,6 +83,10 @@ func (hi *handleImpl) ClientSet() clientset.Interface { return hi.clientSet } +func (hi *handleImpl) PrometheusClient() promapi.Client { + return hi.prometheusClient +} + func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector { return hi.metricsCollector } @@ -131,6 +137,7 @@ type Option func(*handleImplOpts) type handleImplOpts struct { clientSet clientset.Interface + prometheusClient promapi.Client sharedInformerFactory informers.SharedInformerFactory getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc podEvictor *evictions.PodEvictor @@ -144,6 +151,13 @@ func WithClientSet(clientSet clientset.Interface) Option { } } +// WithPrometheusClient sets Prometheus client for the scheduling frameworkImpl. +func WithPrometheusClient(prometheusClient promapi.Client) Option { + return func(o *handleImplOpts) { + o.prometheusClient = prometheusClient + } +} + func WithSharedInformerFactory(sharedInformerFactory informers.SharedInformerFactory) Option { return func(o *handleImplOpts) { o.sharedInformerFactory = sharedInformerFactory @@ -267,6 +281,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts podEvictor: hOpts.podEvictor, }, metricsCollector: hOpts.metricsCollector, + prometheusClient: hOpts.prometheusClient, } pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...) diff --git a/pkg/framework/types/types.go b/pkg/framework/types/types.go index 6cb95ca24..2480e06b0 100644 --- a/pkg/framework/types/types.go +++ b/pkg/framework/types/types.go @@ -26,6 +26,8 @@ import ( "sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + + promapi "github.com/prometheus/client_golang/api" ) // Handle provides handles used by plugins to retrieve a kubernetes client set, @@ -34,6 +36,7 @@ import ( type Handle interface { // ClientSet returns a kubernetes clientSet. ClientSet() clientset.Interface + PrometheusClient() promapi.Client Evictor() Evictor GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc SharedInformerFactory() informers.SharedInformerFactory