1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

[nodeutilization]: prometheus usage client with prometheus metrics

This commit is contained in:
Jan Chaloupka
2024-11-07 16:32:06 +01:00
parent aed345994f
commit e283c31030
26 changed files with 1541 additions and 85 deletions

View File

@@ -129,14 +129,18 @@ These are top level keys in the Descheduler Policy that you can use to configure
| `metricsProviders` | `[]object` | `nil` | Enables various metrics providers like Kubernetes [Metrics Server](https://kubernetes-sigs.github.io/metrics-server/) |
| `evictionFailureEventNotification` | `bool` | `false` | Enables eviction failure event notification. |
| `gracePeriodSeconds` | `int` | `0` | The duration in seconds before the object should be deleted. The value zero indicates delete immediately. |
| `prometheus` |`object`| `nil` | Configures collection of Prometheus metrics for actual resource utilization |
| `prometheus.url` |`string`| `nil` | Points to a Prometheus server url |
| `prometheus.authToken` |`object`| `nil` | Sets Prometheus server authentication token. If not specified in cluster authentication token from the container's file system is read. |
| `prometheus.authToken.secretReference` |`object`| `nil` | Read the authentication token from a kubernetes secret (the secret is expected to contain the token under `prometheusAuthToken` data key) |
| `prometheus.authToken.secretReference.namespace` |`string`| `nil` | Authentication token kubernetes secret namespace (currently, the RBAC configuration permits retrieving secrets from the `kube-system` namespace. If the secret needs to be accessed from a different namespace, the existing RBAC rules must be explicitly extended. |
| `prometheus.authToken.secretReference.name` |`string`| `nil` | Authentication token kubernetes secret name |
The descheduler currently allows to configure a metric collection of Kubernetes Metrics through `metricsProviders` field.
The previous way of setting `metricsCollector` field is deprecated. There is currently one source to configure:
```
metricsProviders:
- source: KubernetesMetrics
```
The list can be extended with other metrics providers in the future.
The previous way of setting `metricsCollector` field is deprecated. There are currently two sources to configure:
- `KubernetesMetrics`: enables metrics collection from Kubernetes Metrics server
- `Prometheus`: enables metrics collection from Prometheus server
In general, each plugin can consume metrics from a different provider so multiple distinct providers can be configured in parallel.
@@ -174,9 +178,15 @@ maxNoOfPodsToEvictPerNode: 5000 # you don't need to set this, unlimited if not s
maxNoOfPodsToEvictPerNamespace: 5000 # you don't need to set this, unlimited if not set
maxNoOfPodsToEvictTotal: 5000 # you don't need to set this, unlimited if not set
gracePeriodSeconds: 60 # you don't need to set this, 0 if not set
# you don't need to set this, Kubernetes metrics are not collected if not set
# you don't need to set this, metrics are not collected if not set
metricsProviders:
- source: KubernetesMetrics
- source: Prometheus
prometheus:
url: http://prometheus-kube-prometheus-prometheus.prom.svc.cluster.local
authToken:
secretReference:
namespace: "kube-system"
name: "authtoken"
profiles:
- name: ProfileName
pluginConfig:
@@ -303,6 +313,10 @@ like `kubectl top`) may differ from the calculated consumption, due to these com
actual usage metrics. Metrics-based descheduling can be enabled by setting `metricsUtilization.metricsServer` field (deprecated)
or `metricsUtilization.source` field to `KubernetesMetrics`.
In order to have the plugin consume the metrics the metric provider needs to be configured as well.
Alternatively, it is possible to create a prometheus client and configure a prometheus query to consume
metrics outside of the kubernetes metrics server. The query is expected to return a vector of values for
each node. The values are expected to be any real number within <0; 1> interval. During eviction only
a single pod is evicted at most from each overutilized node. There's currently no support for evicting more.
See `metricsProviders` field at [Top Level configuration](#top-level-configuration) for available options.
**Parameters:**
@@ -318,6 +332,7 @@ See `metricsProviders` field at [Top Level configuration](#top-level-configurati
|`metricsUtilization`|object|
|`metricsUtilization.metricsServer` (deprecated)|bool|
|`metricsUtilization.source`|string|
|`metricsUtilization.prometheus.query`|string|
**Example:**
@@ -338,8 +353,10 @@ profiles:
"cpu" : 50
"memory": 50
"pods": 50
metricsUtilization:
source: KubernetesMetrics
# metricsUtilization:
# source: Prometheus
# prometheus:
# query: instance:node_cpu:rate:sum
evictionLimits:
node: 5
plugins:

View File

@@ -21,6 +21,7 @@ import (
"strings"
"time"
promapi "github.com/prometheus/client_golang/api"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -54,6 +55,7 @@ type DeschedulerServer struct {
Client clientset.Interface
EventClient clientset.Interface
MetricsClient metricsclient.Interface
PrometheusClient promapi.Client
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
SecureServingInfo *apiserver.SecureServingInfo
DisableMetrics bool

View File

@@ -36,6 +36,15 @@ rules:
resources: ["nodes", "pods"]
verbs: ["get", "list"]
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: descheduler-role
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
@@ -54,3 +63,16 @@ subjects:
- name: descheduler-sa
kind: ServiceAccount
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: descheduler-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: descheduler-role
subjects:
- name: descheduler-sa
kind: ServiceAccount
namespace: kube-system

View File

@@ -115,6 +115,9 @@ const (
// KubernetesMetrics enables metrics from a Kubernetes metrics server.
// Please see https://kubernetes-sigs.github.io/metrics-server/ for more.
KubernetesMetrics MetricsSource = "KubernetesMetrics"
// KubernetesMetrics enables metrics from a Prometheus metrics server.
PrometheusMetrics MetricsSource = "Prometheus"
)
// MetricsCollector configures collection of metrics about actual resource utilization
@@ -128,7 +131,32 @@ type MetricsCollector struct {
type MetricsProvider struct {
// Source enables metrics from Kubernetes metrics server.
Source MetricsSource
// Prometheus enables metrics collection through Prometheus
Prometheus *Prometheus
}
// ReferencedResourceList is an adaption of v1.ResourceList with resources as references
type ReferencedResourceList = map[v1.ResourceName]*resource.Quantity
type Prometheus struct {
URL string
// authToken used for authentication with the prometheus server.
// If not set the in cluster authentication token for the descheduler service
// account is read from the container's file system.
AuthToken *AuthToken
}
type AuthToken struct {
// secretReference references an authentication token.
// secrets are expected to be created under the descheduler's namespace.
SecretReference *SecretReference
}
// SecretReference holds a reference to a Secret
type SecretReference struct {
// namespace is the namespace of the secret.
Namespace string
// name is the name of the secret.
Name string
}

View File

@@ -90,6 +90,9 @@ const (
// KubernetesMetrics enables metrics from a Kubernetes metrics server.
// Please see https://kubernetes-sigs.github.io/metrics-server/ for more.
KubernetesMetrics MetricsSource = "KubernetesMetrics"
// KubernetesMetrics enables metrics from a Prometheus metrics server.
PrometheusMetrics MetricsSource = "Prometheus"
)
// MetricsCollector configures collection of metrics about actual resource utilization
@@ -103,4 +106,29 @@ type MetricsCollector struct {
type MetricsProvider struct {
// Source enables metrics from Kubernetes metrics server.
Source MetricsSource `json:"source,omitempty"`
// Prometheus enables metrics collection through Prometheus
Prometheus *Prometheus `json:"prometheus,omitempty"`
}
type Prometheus struct {
URL string `json:"url,omitempty"`
// authToken used for authentication with the prometheus server.
// If not set the in cluster authentication token for the descheduler service
// account is read from the container's file system.
AuthToken *AuthToken `json:"authToken,omitempty"`
}
type AuthToken struct {
// secretReference references an authentication token.
// secrets are expected to be created under the descheduler's namespace.
SecretReference *SecretReference `json:"secretReference,omitempty"`
}
// SecretReference holds a reference to a Secret
type SecretReference struct {
// namespace is the namespace of the secret.
Namespace string `json:"namespace,omitempty"`
// name is the name of the secret.
Name string `json:"name,omitempty"`
}

View File

@@ -36,6 +36,16 @@ func init() {
// RegisterConversions adds conversion functions to the given scheme.
// Public to allow building arbitrary schemes.
func RegisterConversions(s *runtime.Scheme) error {
if err := s.AddGeneratedConversionFunc((*AuthToken)(nil), (*api.AuthToken)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha2_AuthToken_To_api_AuthToken(a.(*AuthToken), b.(*api.AuthToken), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*api.AuthToken)(nil), (*AuthToken)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_AuthToken_To_v1alpha2_AuthToken(a.(*api.AuthToken), b.(*AuthToken), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*DeschedulerProfile)(nil), (*api.DeschedulerProfile)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha2_DeschedulerProfile_To_api_DeschedulerProfile(a.(*DeschedulerProfile), b.(*api.DeschedulerProfile), scope)
}); err != nil {
@@ -91,6 +101,26 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*Prometheus)(nil), (*api.Prometheus)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha2_Prometheus_To_api_Prometheus(a.(*Prometheus), b.(*api.Prometheus), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*api.Prometheus)(nil), (*Prometheus)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_Prometheus_To_v1alpha2_Prometheus(a.(*api.Prometheus), b.(*Prometheus), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*SecretReference)(nil), (*api.SecretReference)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha2_SecretReference_To_api_SecretReference(a.(*SecretReference), b.(*api.SecretReference), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*api.SecretReference)(nil), (*SecretReference)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_SecretReference_To_v1alpha2_SecretReference(a.(*api.SecretReference), b.(*SecretReference), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*api.DeschedulerPolicy)(nil), (*DeschedulerPolicy)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_DeschedulerPolicy_To_v1alpha2_DeschedulerPolicy(a.(*api.DeschedulerPolicy), b.(*DeschedulerPolicy), scope)
}); err != nil {
@@ -109,6 +139,26 @@ func RegisterConversions(s *runtime.Scheme) error {
return nil
}
func autoConvert_v1alpha2_AuthToken_To_api_AuthToken(in *AuthToken, out *api.AuthToken, s conversion.Scope) error {
out.SecretReference = (*api.SecretReference)(unsafe.Pointer(in.SecretReference))
return nil
}
// Convert_v1alpha2_AuthToken_To_api_AuthToken is an autogenerated conversion function.
func Convert_v1alpha2_AuthToken_To_api_AuthToken(in *AuthToken, out *api.AuthToken, s conversion.Scope) error {
return autoConvert_v1alpha2_AuthToken_To_api_AuthToken(in, out, s)
}
func autoConvert_api_AuthToken_To_v1alpha2_AuthToken(in *api.AuthToken, out *AuthToken, s conversion.Scope) error {
out.SecretReference = (*SecretReference)(unsafe.Pointer(in.SecretReference))
return nil
}
// Convert_api_AuthToken_To_v1alpha2_AuthToken is an autogenerated conversion function.
func Convert_api_AuthToken_To_v1alpha2_AuthToken(in *api.AuthToken, out *AuthToken, s conversion.Scope) error {
return autoConvert_api_AuthToken_To_v1alpha2_AuthToken(in, out, s)
}
func autoConvert_v1alpha2_DeschedulerPolicy_To_api_DeschedulerPolicy(in *DeschedulerPolicy, out *api.DeschedulerPolicy, s conversion.Scope) error {
if in.Profiles != nil {
in, out := &in.Profiles, &out.Profiles
@@ -225,6 +275,7 @@ func Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in *api.MetricsCo
func autoConvert_v1alpha2_MetricsProvider_To_api_MetricsProvider(in *MetricsProvider, out *api.MetricsProvider, s conversion.Scope) error {
out.Source = api.MetricsSource(in.Source)
out.Prometheus = (*api.Prometheus)(unsafe.Pointer(in.Prometheus))
return nil
}
@@ -235,6 +286,7 @@ func Convert_v1alpha2_MetricsProvider_To_api_MetricsProvider(in *MetricsProvider
func autoConvert_api_MetricsProvider_To_v1alpha2_MetricsProvider(in *api.MetricsProvider, out *MetricsProvider, s conversion.Scope) error {
out.Source = MetricsSource(in.Source)
out.Prometheus = (*Prometheus)(unsafe.Pointer(in.Prometheus))
return nil
}
@@ -339,3 +391,47 @@ func autoConvert_api_Plugins_To_v1alpha2_Plugins(in *api.Plugins, out *Plugins,
func Convert_api_Plugins_To_v1alpha2_Plugins(in *api.Plugins, out *Plugins, s conversion.Scope) error {
return autoConvert_api_Plugins_To_v1alpha2_Plugins(in, out, s)
}
func autoConvert_v1alpha2_Prometheus_To_api_Prometheus(in *Prometheus, out *api.Prometheus, s conversion.Scope) error {
out.URL = in.URL
out.AuthToken = (*api.AuthToken)(unsafe.Pointer(in.AuthToken))
return nil
}
// Convert_v1alpha2_Prometheus_To_api_Prometheus is an autogenerated conversion function.
func Convert_v1alpha2_Prometheus_To_api_Prometheus(in *Prometheus, out *api.Prometheus, s conversion.Scope) error {
return autoConvert_v1alpha2_Prometheus_To_api_Prometheus(in, out, s)
}
func autoConvert_api_Prometheus_To_v1alpha2_Prometheus(in *api.Prometheus, out *Prometheus, s conversion.Scope) error {
out.URL = in.URL
out.AuthToken = (*AuthToken)(unsafe.Pointer(in.AuthToken))
return nil
}
// Convert_api_Prometheus_To_v1alpha2_Prometheus is an autogenerated conversion function.
func Convert_api_Prometheus_To_v1alpha2_Prometheus(in *api.Prometheus, out *Prometheus, s conversion.Scope) error {
return autoConvert_api_Prometheus_To_v1alpha2_Prometheus(in, out, s)
}
func autoConvert_v1alpha2_SecretReference_To_api_SecretReference(in *SecretReference, out *api.SecretReference, s conversion.Scope) error {
out.Namespace = in.Namespace
out.Name = in.Name
return nil
}
// Convert_v1alpha2_SecretReference_To_api_SecretReference is an autogenerated conversion function.
func Convert_v1alpha2_SecretReference_To_api_SecretReference(in *SecretReference, out *api.SecretReference, s conversion.Scope) error {
return autoConvert_v1alpha2_SecretReference_To_api_SecretReference(in, out, s)
}
func autoConvert_api_SecretReference_To_v1alpha2_SecretReference(in *api.SecretReference, out *SecretReference, s conversion.Scope) error {
out.Namespace = in.Namespace
out.Name = in.Name
return nil
}
// Convert_api_SecretReference_To_v1alpha2_SecretReference is an autogenerated conversion function.
func Convert_api_SecretReference_To_v1alpha2_SecretReference(in *api.SecretReference, out *SecretReference, s conversion.Scope) error {
return autoConvert_api_SecretReference_To_v1alpha2_SecretReference(in, out, s)
}

View File

@@ -25,6 +25,27 @@ import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AuthToken) DeepCopyInto(out *AuthToken) {
*out = *in
if in.SecretReference != nil {
in, out := &in.SecretReference, &out.SecretReference
*out = new(SecretReference)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthToken.
func (in *AuthToken) DeepCopy() *AuthToken {
if in == nil {
return nil
}
out := new(AuthToken)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) {
*out = *in
@@ -69,7 +90,9 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) {
if in.MetricsProviders != nil {
in, out := &in.MetricsProviders, &out.MetricsProviders
*out = make([]MetricsProvider, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.GracePeriodSeconds != nil {
in, out := &in.GracePeriodSeconds, &out.GracePeriodSeconds
@@ -140,6 +163,11 @@ func (in *MetricsCollector) DeepCopy() *MetricsCollector {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MetricsProvider) DeepCopyInto(out *MetricsProvider) {
*out = *in
if in.Prometheus != nil {
in, out := &in.Prometheus, &out.Prometheus
*out = new(Prometheus)
(*in).DeepCopyInto(*out)
}
return
}
@@ -217,3 +245,40 @@ func (in *Plugins) DeepCopy() *Plugins {
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Prometheus) DeepCopyInto(out *Prometheus) {
*out = *in
if in.AuthToken != nil {
in, out := &in.AuthToken, &out.AuthToken
*out = new(AuthToken)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Prometheus.
func (in *Prometheus) DeepCopy() *Prometheus {
if in == nil {
return nil
}
out := new(Prometheus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SecretReference) DeepCopyInto(out *SecretReference) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference.
func (in *SecretReference) DeepCopy() *SecretReference {
if in == nil {
return nil
}
out := new(SecretReference)
in.DeepCopyInto(out)
return out
}

View File

@@ -25,6 +25,27 @@ import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AuthToken) DeepCopyInto(out *AuthToken) {
*out = *in
if in.SecretReference != nil {
in, out := &in.SecretReference, &out.SecretReference
*out = new(SecretReference)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthToken.
func (in *AuthToken) DeepCopy() *AuthToken {
if in == nil {
return nil
}
out := new(AuthToken)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) {
*out = *in
@@ -69,7 +90,9 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) {
if in.MetricsProviders != nil {
in, out := &in.MetricsProviders, &out.MetricsProviders
*out = make([]MetricsProvider, len(*in))
copy(*out, *in)
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.GracePeriodSeconds != nil {
in, out := &in.GracePeriodSeconds, &out.GracePeriodSeconds
@@ -161,6 +184,11 @@ func (in *MetricsCollector) DeepCopy() *MetricsCollector {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MetricsProvider) DeepCopyInto(out *MetricsProvider) {
*out = *in
if in.Prometheus != nil {
in, out := &in.Prometheus, &out.Prometheus
*out = new(Prometheus)
(*in).DeepCopyInto(*out)
}
return
}
@@ -288,6 +316,27 @@ func (in *PriorityThreshold) DeepCopy() *PriorityThreshold {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Prometheus) DeepCopyInto(out *Prometheus) {
*out = *in
if in.AuthToken != nil {
in, out := &in.AuthToken, &out.AuthToken
*out = new(AuthToken)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Prometheus.
func (in *Prometheus) DeepCopy() *Prometheus {
if in == nil {
return nil
}
out := new(Prometheus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in ResourceThresholds) DeepCopyInto(out *ResourceThresholds) {
{
@@ -309,3 +358,19 @@ func (in ResourceThresholds) DeepCopy() ResourceThresholds {
in.DeepCopyInto(out)
return *out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SecretReference) DeepCopyInto(out *SecretReference) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference.
func (in *SecretReference) DeepCopy() *SecretReference {
if in == nil {
return nil
}
out := new(SecretReference)
in.DeepCopyInto(out)
return out
}

View File

@@ -17,17 +17,30 @@ limitations under the License.
package client
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
promapi "github.com/prometheus/client_golang/api"
"github.com/prometheus/common/config"
// Ensure to load all auth plugins.
clientset "k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport"
componentbaseconfig "k8s.io/component-base/config"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
)
var K8sPodCAFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) {
var cfg *rest.Config
if len(clientConnection.Kubeconfig) != 0 {
@@ -94,3 +107,61 @@ func GetMasterFromKubeconfig(filename string) (string, error) {
}
return "", fmt.Errorf("failed to get master address from kubeconfig: cluster information not found")
}
func loadCAFile(filepath string) (*x509.CertPool, error) {
caCert, err := ioutil.ReadFile(filepath)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to append CA certificate to the pool")
}
return caCertPool, nil
}
func CreatePrometheusClient(prometheusURL, authToken string) (promapi.Client, *http.Transport, error) {
// Retrieve Pod CA cert
caCertPool, err := loadCAFile(K8sPodCAFilePath)
if err != nil {
return nil, nil, fmt.Errorf("Error loading CA file: %v", err)
}
// Get Prometheus Host
u, err := url.Parse(prometheusURL)
if err != nil {
return nil, nil, fmt.Errorf("Error parsing prometheus URL: %v", err)
}
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
ServerName: u.Host,
},
}
roundTripper := transport.NewBearerAuthRoundTripper(
authToken,
t,
)
if authToken != "" {
client, err := promapi.NewClient(promapi.Config{
Address: prometheusURL,
RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(authToken), roundTripper),
})
return client, t, err
}
client, err := promapi.NewClient(promapi.Config{
Address: prometheusURL,
})
return client, t, err
}

View File

@@ -20,9 +20,12 @@ import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"time"
promapi "github.com/prometheus/client_golang/api"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@@ -30,18 +33,24 @@ import (
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
@@ -62,6 +71,11 @@ import (
"sigs.k8s.io/descheduler/pkg/version"
)
const (
prometheusAuthTokenSecretKey = "prometheusAuthToken"
workQueueKey = "key"
)
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
type profileRunner struct {
@@ -70,15 +84,21 @@ type profileRunner struct {
}
type descheduler struct {
rs *options.DeschedulerServer
ir *informerResources
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
deschedulerPolicy *api.DeschedulerPolicy
eventRecorder events.EventRecorder
podEvictor *evictions.PodEvictor
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
metricsCollector *metricscollector.MetricsCollector
rs *options.DeschedulerServer
ir *informerResources
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
namespacedSecretsLister corev1listers.SecretNamespaceLister
deschedulerPolicy *api.DeschedulerPolicy
eventRecorder events.EventRecorder
podEvictor *evictions.PodEvictor
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
metricsCollector *metricscollector.MetricsCollector
prometheusClient promapi.Client
previousPrometheusClientTransport *http.Transport
queue workqueue.RateLimitingInterface
currentPrometheusAuthToken string
metricsProviders map[api.MetricsSource]*api.MetricsProvider
}
type informerResources struct {
@@ -125,8 +145,15 @@ func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFact
return nil
}
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory,
) (*descheduler, error) {
func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider {
providersMap := make(map[api.MetricsSource]*api.MetricsProvider)
for _, provider := range providersList {
providersMap[provider.Source] = &provider
}
return providersMap
}
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
ir := newInformerResources(sharedInformerFactory)
@@ -165,20 +192,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
return nil, err
}
var metricsCollector *metricscollector.MetricsCollector
if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || (len(deschedulerPolicy.MetricsProviders) > 0 && deschedulerPolicy.MetricsProviders[0].Source == api.KubernetesMetrics) {
nodeSelector := labels.Everything()
if deschedulerPolicy.NodeSelector != nil {
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
if err != nil {
return nil, err
}
nodeSelector = sel
}
metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
}
return &descheduler{
desch := &descheduler{
rs: rs,
ir: ir,
getPodsAssignedToNode: getPodsAssignedToNode,
@@ -187,8 +201,148 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
eventRecorder: eventRecorder,
podEvictor: podEvictor,
podEvictionReactionFnc: podEvictionReactionFnc,
metricsCollector: metricsCollector,
}, nil
prometheusClient: rs.PrometheusClient,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}),
metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders),
}
if rs.MetricsClient != nil {
nodeSelector := labels.Everything()
if deschedulerPolicy.NodeSelector != nil {
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
if err != nil {
return nil, err
}
nodeSelector = sel
}
desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
}
prometheusProvider := desch.metricsProviders[api.PrometheusMetrics]
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil {
authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference
if authTokenSecret == nil || authTokenSecret.Namespace == "" {
return nil, fmt.Errorf("prometheus metrics source configuration is missing authentication token secret")
}
if namespacedSharedInformerFactory == nil {
return nil, fmt.Errorf("namespacedSharedInformerFactory not configured")
}
namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(desch.eventHandler())
desch.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace)
}
return desch, nil
}
func (d *descheduler) reconcileInClusterSAToken() error {
// Read the sa token and assume it has the sufficient permissions to authenticate
cfg, err := rest.InClusterConfig()
if err == nil {
if d.currentPrometheusAuthToken != cfg.BearerToken {
klog.V(2).Infof("Creating Prometheus client (with SA token)")
prometheusClient, transport, err := client.CreatePrometheusClient(d.metricsProviders[api.PrometheusMetrics].Prometheus.URL, cfg.BearerToken)
if err != nil {
return fmt.Errorf("unable to create a prometheus client: %v", err)
}
d.prometheusClient = prometheusClient
if d.previousPrometheusClientTransport != nil {
d.previousPrometheusClientTransport.CloseIdleConnections()
}
d.previousPrometheusClientTransport = transport
d.currentPrometheusAuthToken = cfg.BearerToken
}
return nil
}
if err == rest.ErrNotInCluster {
return nil
}
return fmt.Errorf("unexpected error when reading in cluster config: %v", err)
}
func (d *descheduler) runAuthenticationSecretReconciler(ctx context.Context) {
defer utilruntime.HandleCrash()
defer d.queue.ShutDown()
klog.Infof("Starting authentication secret reconciler")
defer klog.Infof("Shutting down authentication secret reconciler")
go wait.UntilWithContext(ctx, d.runAuthenticationSecretReconcilerWorker, time.Second)
<-ctx.Done()
}
func (d *descheduler) runAuthenticationSecretReconcilerWorker(ctx context.Context) {
for d.processNextWorkItem(ctx) {
}
}
func (d *descheduler) processNextWorkItem(ctx context.Context) bool {
dsKey, quit := d.queue.Get()
if quit {
return false
}
defer d.queue.Done(dsKey)
err := d.sync()
if err == nil {
d.queue.Forget(dsKey)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
d.queue.AddRateLimited(dsKey)
return true
}
func (d *descheduler) sync() error {
prometheusConfig := d.metricsProviders[api.PrometheusMetrics].Prometheus
if prometheusConfig == nil || prometheusConfig.AuthToken == nil || prometheusConfig.AuthToken.SecretReference == nil {
return fmt.Errorf("prometheus metrics source configuration is missing authentication token secret")
}
ns := prometheusConfig.AuthToken.SecretReference.Namespace
name := prometheusConfig.AuthToken.SecretReference.Name
secretObj, err := d.namespacedSecretsLister.Get(name)
if err != nil {
// clear the token if the secret is not found
if apierrors.IsNotFound(err) {
d.currentPrometheusAuthToken = ""
if d.previousPrometheusClientTransport != nil {
d.previousPrometheusClientTransport.CloseIdleConnections()
}
d.previousPrometheusClientTransport = nil
d.prometheusClient = nil
}
return fmt.Errorf("unable to get %v/%v secret", ns, name)
}
authToken := string(secretObj.Data[prometheusAuthTokenSecretKey])
if authToken == "" {
return fmt.Errorf("prometheus authentication token secret missing %q data or empty", prometheusAuthTokenSecretKey)
}
if d.currentPrometheusAuthToken == authToken {
return nil
}
klog.V(2).Infof("authentication secret token updated, recreating prometheus client")
prometheusClient, transport, err := client.CreatePrometheusClient(prometheusConfig.URL, authToken)
if err != nil {
return fmt.Errorf("unable to create a prometheus client: %v", err)
}
d.prometheusClient = prometheusClient
if d.previousPrometheusClientTransport != nil {
d.previousPrometheusClientTransport.CloseIdleConnections()
}
d.previousPrometheusClientTransport = transport
d.currentPrometheusAuthToken = authToken
return nil
}
func (d *descheduler) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { d.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { d.queue.Add(workQueueKey) },
DeleteFunc: func(obj interface{}) { d.queue.Add(workQueueKey) },
}
}
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
@@ -268,6 +422,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
frameworkprofile.WithPodEvictor(d.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
frameworkprofile.WithMetricsCollector(d.metricsCollector),
frameworkprofile.WithPrometheusClient(d.prometheusClient),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
@@ -332,7 +487,7 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
return err
}
if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || (len(deschedulerPolicy.MetricsProviders) > 0 && deschedulerPolicy.MetricsProviders[0].Source == api.KubernetesMetrics) {
if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || metricsProviderListToMap(deschedulerPolicy.MetricsProviders)[api.KubernetesMetrics] != nil {
metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler")
if err != nil {
return err
@@ -415,6 +570,14 @@ func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset) func(action cor
}
}
type tokenReconciliation int
const (
noReconciliation tokenReconciliation = iota
inClusterReconciliation
secretReconciliation
)
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies")
@@ -436,7 +599,22 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient)
defer eventBroadcaster.Shutdown()
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory)
var namespacedSharedInformerFactory informers.SharedInformerFactory
metricProviderTokenReconciliation := noReconciliation
prometheusProvider := metricsProviderListToMap(deschedulerPolicy.MetricsProviders)[api.PrometheusMetrics]
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.URL != "" {
if prometheusProvider.Prometheus.AuthToken != nil {
// Will get reconciled
namespacedSharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields), informers.WithNamespace(prometheusProvider.Prometheus.AuthToken.SecretReference.Namespace))
metricProviderTokenReconciliation = secretReconciliation
} else {
// Use the sa token and assume it has the sufficient permissions to authenticate
metricProviderTokenReconciliation = inClusterReconciliation
}
}
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory, namespacedSharedInformerFactory)
if err != nil {
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
@@ -445,10 +623,17 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
defer cancel()
sharedInformerFactory.Start(ctx.Done())
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.Start(ctx.Done())
}
sharedInformerFactory.WaitForCacheSync(ctx.Done())
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done())
}
if (deschedulerPolicy.MetricsCollector != nil && deschedulerPolicy.MetricsCollector.Enabled) || (len(deschedulerPolicy.MetricsProviders) > 0 && deschedulerPolicy.MetricsProviders[0].Source == api.KubernetesMetrics) {
if descheduler.metricsCollector != nil {
go func() {
klog.V(2).Infof("Starting metrics collector")
descheduler.metricsCollector.Run(ctx)
@@ -462,7 +647,19 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
}
}
if metricProviderTokenReconciliation == secretReconciliation {
go descheduler.runAuthenticationSecretReconciler(ctx)
}
wait.NonSlidingUntil(func() {
if metricProviderTokenReconciliation == inClusterReconciliation {
// Read the sa token and assume it has the sufficient permissions to authenticate
if err := descheduler.reconcileInClusterSAToken(); err != nil {
klog.ErrorS(err, "unable to reconcile an in cluster SA token")
return
}
}
// A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
defer sSpan.End()

View File

@@ -193,7 +193,7 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client)
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory)
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory, nil)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Unable to create a descheduler instance: %v", err)

View File

@@ -19,6 +19,7 @@ package descheduler
import (
"context"
"fmt"
"net/url"
"os"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@@ -154,11 +155,42 @@ func validateDeschedulerConfiguration(in api.DeschedulerPolicy, registry pluginr
}
}
}
if len(in.MetricsProviders) > 1 {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("only a single metrics provider can be set, got %v instead", len(in.MetricsProviders)))
providers := map[api.MetricsSource]api.MetricsProvider{}
for _, provider := range in.MetricsProviders {
if _, ok := providers[provider.Source]; ok {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("metric provider %q is already configured, each source can be configured only once", provider.Source))
} else {
providers[provider.Source] = provider
}
}
if len(in.MetricsProviders) > 0 && in.MetricsCollector != nil && in.MetricsCollector.Enabled {
if _, exists := providers[api.KubernetesMetrics]; exists && in.MetricsCollector != nil && in.MetricsCollector.Enabled {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("it is not allowed to combine metrics provider when metrics collector is enabled"))
}
if prometheusConfig, exists := providers[api.PrometheusMetrics]; exists {
if prometheusConfig.Prometheus == nil {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus configuration is required when prometheus source is enabled"))
} else {
if prometheusConfig.Prometheus.URL == "" {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus URL is required when prometheus is enabled"))
} else {
u, err := url.Parse(prometheusConfig.Prometheus.URL)
if err != nil {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("error parsing prometheus URL: %v", err))
} else if u.Scheme != "https" {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus URL's scheme is not https, got %q instead", u.Scheme))
}
}
if prometheusConfig.Prometheus.AuthToken != nil {
secretRef := prometheusConfig.Prometheus.AuthToken.SecretReference
if secretRef == nil {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus authToken secret is expected to be set when authToken field is"))
} else if secretRef.Name == "" || secretRef.Namespace == "" {
errorsInPolicy = append(errorsInPolicy, fmt.Errorf("prometheus authToken secret reference does not set both namespace and name"))
}
}
}
}
return utilerrors.NewAggregate(errorsInPolicy)
}

View File

@@ -212,14 +212,14 @@ func TestValidateDeschedulerConfiguration(t *testing.T) {
result: fmt.Errorf("[in profile RemoveFailedPods: only one of Include/Exclude namespaces can be set, in profile RemovePodsViolatingTopologySpreadConstraint: only one of Include/Exclude namespaces can be set]"),
},
{
description: "Too many metrics providers error",
description: "Duplicit metrics providers error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{Source: api.KubernetesMetrics},
{Source: api.KubernetesMetrics},
},
},
result: fmt.Errorf("only a single metrics provider can be set, got 2 instead"),
result: fmt.Errorf("metric provider \"KubernetesMetrics\" is already configured, each source can be configured only once"),
},
{
description: "Too many metrics providers error",
@@ -233,12 +233,145 @@ func TestValidateDeschedulerConfiguration(t *testing.T) {
},
result: fmt.Errorf("it is not allowed to combine metrics provider when metrics collector is enabled"),
},
{
description: "missing prometheus url error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{},
},
},
},
result: fmt.Errorf("prometheus URL is required when prometheus is enabled"),
},
{
description: "prometheus url is not valid error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: "http://example.com:-80",
},
},
},
},
result: fmt.Errorf("error parsing prometheus URL: parse \"http://example.com:-80\": invalid port \":-80\" after host"),
},
{
description: "prometheus url does not have https error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: "http://example.com:80",
},
},
},
},
result: fmt.Errorf("prometheus URL's scheme is not https, got \"http\" instead"),
},
{
description: "prometheus authtoken with no secret reference error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: "https://example.com:80",
AuthToken: &api.AuthToken{},
},
},
},
},
result: fmt.Errorf("prometheus authToken secret is expected to be set when authToken field is"),
},
{
description: "prometheus authtoken with empty secret reference error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: "https://example.com:80",
AuthToken: &api.AuthToken{
SecretReference: &api.SecretReference{},
},
},
},
},
},
result: fmt.Errorf("prometheus authToken secret reference does not set both namespace and name"),
},
{
description: "prometheus authtoken missing secret reference namespace error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: "https://example.com:80",
AuthToken: &api.AuthToken{
SecretReference: &api.SecretReference{
Name: "secretname",
},
},
},
},
},
},
result: fmt.Errorf("prometheus authToken secret reference does not set both namespace and name"),
},
{
description: "prometheus authtoken missing secret reference name error",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: "https://example.com:80",
AuthToken: &api.AuthToken{
SecretReference: &api.SecretReference{
Namespace: "secretnamespace",
},
},
},
},
},
},
result: fmt.Errorf("prometheus authToken secret reference does not set both namespace and name"),
},
{
description: "valid prometheus authtoken secret reference",
deschedulerPolicy: api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: "https://example.com:80",
AuthToken: &api.AuthToken{
SecretReference: &api.SecretReference{
Name: "secretname",
Namespace: "secretnamespace",
},
},
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
result := validateDeschedulerConfiguration(tc.deschedulerPolicy, pluginregistry.PluginRegistry)
if result.Error() != tc.result.Error() {
if result == nil && tc.result != nil || result != nil && tc.result == nil {
t.Errorf("test '%s' failed. expected \n'%s', got \n'%s'", tc.description, tc.result, result)
} else if result == nil && tc.result == nil {
return
} else if result.Error() != tc.result.Error() {
t.Errorf("test '%s' failed. expected \n'%s', got \n'%s'", tc.description, tc.result, result)
}
})

View File

@@ -11,6 +11,8 @@ import (
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
promapi "github.com/prometheus/client_golang/api"
)
type HandleImpl struct {
@@ -20,6 +22,7 @@ type HandleImpl struct {
EvictorFilterImpl frameworktypes.EvictorPlugin
PodEvictorImpl *evictions.PodEvictor
MetricsCollectorImpl *metricscollector.MetricsCollector
PrometheusClientImpl promapi.Client
}
var _ frameworktypes.Handle = &HandleImpl{}
@@ -28,6 +31,10 @@ func (hi *HandleImpl) ClientSet() clientset.Interface {
return hi.ClientsetImpl
}
func (hi *HandleImpl) PrometheusClient() promapi.Client {
return hi.PrometheusClientImpl
}
func (hi *HandleImpl) MetricsCollector() *metricscollector.MetricsCollector {
return hi.MetricsCollectorImpl
}

View File

@@ -95,7 +95,7 @@ func (h *HighNodeUtilization) Name() string {
// Balance extension point implementation for the plugin
func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
if err := h.usageClient.sync(nodes); err != nil {
if err := h.usageClient.sync(ctx, nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}

View File

@@ -55,7 +55,23 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f
return nil, fmt.Errorf("want args to be of type LowNodeUtilizationArgs, got %T", args)
}
setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds)
metricsUtilization := lowNodeUtilizationArgsArgs.MetricsUtilization
if metricsUtilization != nil && metricsUtilization.Source == api.PrometheusMetrics {
if metricsUtilization.Prometheus != nil && metricsUtilization.Prometheus.Query != "" {
uResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)
oResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.TargetThresholds)
if len(uResourceNames) != 1 || uResourceNames[0] != MetricResource {
return nil, fmt.Errorf("thresholds are expected to specify a single instance of %q resource, got %v instead", MetricResource, uResourceNames)
}
if len(oResourceNames) != 1 || oResourceNames[0] != MetricResource {
return nil, fmt.Errorf("targetThresholds are expected to specify a single instance of %q resource, got %v instead", MetricResource, oResourceNames)
}
} else {
return nil, fmt.Errorf("prometheus query is missing")
}
} else {
setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds)
}
underutilizationCriteria := []interface{}{
"CPU", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourceCPU],
@@ -90,11 +106,23 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f
var usageClient usageClient
// MetricsServer is deprecated, removed once dropped
if lowNodeUtilizationArgsArgs.MetricsUtilization != nil && (lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer || lowNodeUtilizationArgsArgs.MetricsUtilization.Source == api.KubernetesMetrics) {
if handle.MetricsCollector() == nil {
return nil, fmt.Errorf("metrics client not initialized")
if metricsUtilization != nil {
switch {
case metricsUtilization.MetricsServer, metricsUtilization.Source == api.KubernetesMetrics:
if handle.MetricsCollector() == nil {
return nil, fmt.Errorf("metrics client not initialized")
}
usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
case metricsUtilization.Source == api.PrometheusMetrics:
if handle.PrometheusClient() == nil {
return nil, fmt.Errorf("prometheus client not initialized")
}
usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), handle.PrometheusClient(), metricsUtilization.Prometheus.Query)
case metricsUtilization.Source != "":
return nil, fmt.Errorf("unrecognized metrics source")
default:
return nil, fmt.Errorf("metrics source is empty")
}
usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
} else {
usageClient = newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc())
}
@@ -117,7 +145,7 @@ func (l *LowNodeUtilization) Name() string {
// Balance extension point implementation for the plugin
func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
if err := l.usageClient.sync(nodes); err != nil {
if err := l.usageClient.sync(ctx, nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
@@ -126,12 +154,20 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, l.usageClient)
var nodeThresholdsMap map[string][]api.ResourceThresholds
if l.args.UseDeviationThresholds {
nodeThresholdsMap = getNodeThresholdsFromAverageNodeUsage(nodes, l.usageClient, l.args.Thresholds, l.args.TargetThresholds)
thresholds, average := getNodeThresholdsFromAverageNodeUsage(nodes, l.usageClient, l.args.Thresholds, l.args.TargetThresholds)
klog.InfoS("Average utilization through all nodes", "utilization", average)
// All nodes are expected to have the same thresholds
for nodeName := range thresholds {
klog.InfoS("Underutilization threshold based on average utilization", "threshold", thresholds[nodeName][0])
klog.InfoS("Overutilization threshold based on average utilization", "threshold", thresholds[nodeName][1])
break
}
nodeThresholdsMap = thresholds
} else {
nodeThresholdsMap = getStaticNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds)
}
nodesUsageAsNodeThresholdsMap := nodeUsageToResourceThresholds(nodesUsageMap, nodesMap)
nodesUsageAsNodeThresholdsMap := nodeUsageToResourceThresholds(nodesUsageMap, nodesMap)
nodeGroups := classifyNodeUsage(
nodesUsageAsNodeThresholdsMap,
nodeThresholdsMap,

View File

@@ -41,6 +41,8 @@ import (
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/test"
"github.com/prometheus/common/model"
)
func TestLowNodeUtilization(t *testing.T) {
@@ -1257,9 +1259,9 @@ func TestLowNodeUtilization(t *testing.T) {
}
handle.MetricsCollectorImpl = collector
var metricsSource api.MetricsSource = ""
var metricsUtilization *MetricsUtilization
if metricsEnabled {
metricsSource = api.KubernetesMetrics
metricsUtilization = &MetricsUtilization{Source: api.KubernetesMetrics}
}
plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{
@@ -1268,9 +1270,7 @@ func TestLowNodeUtilization(t *testing.T) {
UseDeviationThresholds: tc.useDeviationThresholds,
EvictionLimits: tc.evictionLimits,
EvictableNamespaces: tc.evictableNamespaces,
MetricsUtilization: &MetricsUtilization{
Source: metricsSource,
},
MetricsUtilization: metricsUtilization,
},
handle)
if err != nil {
@@ -1444,3 +1444,241 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
})
}
}
func withLocalStorage(pod *v1.Pod) {
// A pod with local storage.
test.SetNormalOwnerRef(pod)
pod.Spec.Volumes = []v1.Volume{
{
Name: "sample",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{Path: "somePath"},
EmptyDir: &v1.EmptyDirVolumeSource{
SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI),
},
},
},
}
// A Mirror Pod.
pod.Annotations = test.GetMirrorPodAnnotation()
}
func withCriticalPod(pod *v1.Pod) {
// A Critical Pod.
test.SetNormalOwnerRef(pod)
pod.Namespace = "kube-system"
priority := utils.SystemCriticalPriority
pod.Spec.Priority = &priority
}
func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) {
n1NodeName := "n1"
n2NodeName := "n2"
n3NodeName := "n3"
testCases := []struct {
name string
samples model.Vector
nodes []*v1.Node
pods []*v1.Pod
expectedPodsEvicted uint
evictedPods []string
args *LowNodeUtilizationArgs
}{
{
name: "with instance:node_cpu:rate:sum query",
args: &LowNodeUtilizationArgs{
Thresholds: api.ResourceThresholds{
MetricResource: 30,
},
TargetThresholds: api.ResourceThresholds{
MetricResource: 50,
},
MetricsUtilization: &MetricsUtilization{
Source: api.PrometheusMetrics,
Prometheus: &Prometheus{
Query: "instance:node_cpu:rate:sum",
},
},
},
samples: model.Vector{
sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561),
sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522),
sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104),
},
nodes: []*v1.Node{
test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil),
test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil),
test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage),
test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 1,
},
{
name: "with instance:node_cpu:rate:sum query with more evictions",
args: &LowNodeUtilizationArgs{
Thresholds: api.ResourceThresholds{
MetricResource: 30,
},
TargetThresholds: api.ResourceThresholds{
MetricResource: 50,
},
EvictionLimits: &api.EvictionLimits{
Node: ptr.To[uint](3),
},
MetricsUtilization: &MetricsUtilization{
Source: api.PrometheusMetrics,
Prometheus: &Prometheus{
Query: "instance:node_cpu:rate:sum",
},
},
},
samples: model.Vector{
sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561),
sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522),
sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104),
},
nodes: []*v1.Node{
test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil),
test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil),
test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage),
test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 3,
},
{
name: "with instance:node_cpu:rate:sum query with deviation",
args: &LowNodeUtilizationArgs{
Thresholds: api.ResourceThresholds{
MetricResource: 5,
},
TargetThresholds: api.ResourceThresholds{
MetricResource: 5,
},
EvictionLimits: &api.EvictionLimits{
Node: ptr.To[uint](2),
},
UseDeviationThresholds: true,
MetricsUtilization: &MetricsUtilization{
Source: api.PrometheusMetrics,
Prometheus: &Prometheus{
Query: "instance:node_cpu:rate:sum",
},
},
},
samples: model.Vector{
sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561),
sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522),
sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104),
},
nodes: []*v1.Node{
test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil),
test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil),
test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage),
test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 2,
},
}
for _, tc := range testCases {
testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var objs []runtime.Object
for _, node := range tc.nodes {
objs = append(objs, node)
}
for _, pod := range tc.pods {
objs = append(objs, pod)
}
fakeClient := fake.NewSimpleClientset(objs...)
podsForEviction := make(map[string]struct{})
for _, pod := range tc.evictedPods {
podsForEviction[pod] = struct{}{}
}
evictionFailed := false
if len(tc.evictedPods) > 0 {
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.CreateAction)
obj := getAction.GetObject()
if eviction, ok := obj.(*policy.Eviction); ok {
if _, exists := podsForEviction[eviction.Name]; exists {
return true, obj, nil
}
evictionFailed = true
return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name)
}
return true, obj, nil
})
}
handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil)
if err != nil {
t.Fatalf("Unable to initialize a framework handle: %v", err)
}
handle.PrometheusClientImpl = &fakePromClient{
result: tc.samples,
dataType: model.ValVector,
}
plugin, err := NewLowNodeUtilization(tc.args, handle)
if err != nil {
t.Fatalf("Unable to initialize the plugin: %v", err)
}
status := plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes)
if status != nil {
t.Fatalf("Balance.err: %v", status.Err)
}
podsEvicted := podEvictor.TotalEvicted()
if expectedPodsEvicted != podsEvicted {
t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted)
}
if evictionFailed {
t.Errorf("Pod evictions failed unexpectedly")
}
}
}
t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted))
}
}

View File

@@ -57,6 +57,7 @@ import (
// - thresholds: map[string][]api.ReferencedResourceList
// - pod list: map[string][]*v1.Pod
// Once the nodes are classified produce the original []NodeInfo so the code is not that much changed (postponing further refactoring once it is needed)
const MetricResource = v1.ResourceName("MetricResource")
// NodeUsage stores a node's info, pods on it, thresholds and its resource usage
type NodeUsage struct {
@@ -94,20 +95,30 @@ func normalizePercentage(percent api.Percentage) api.Percentage {
return percent
}
func nodeCapacity(node *v1.Node, nodeUsage api.ReferencedResourceList) v1.ResourceList {
capacity := node.Status.Capacity
if len(node.Status.Allocatable) > 0 {
capacity = node.Status.Allocatable
}
// the usage captures the metrics resource
if _, ok := nodeUsage[MetricResource]; ok {
// Make ResourceMetrics 100% => 100 points
capacity[MetricResource] = *resource.NewQuantity(int64(100), resource.DecimalSI)
}
return capacity
}
func getNodeThresholdsFromAverageNodeUsage(
nodes []*v1.Node,
usageClient usageClient,
lowSpan, highSpan api.ResourceThresholds,
) map[string][]api.ResourceThresholds {
) (map[string][]api.ResourceThresholds, api.ResourceThresholds) {
total := api.ResourceThresholds{}
average := api.ResourceThresholds{}
numberOfNodes := len(nodes)
for _, node := range nodes {
usage := usageClient.nodeUtilization(node.Name)
nodeCapacity := node.Status.Capacity
if len(node.Status.Allocatable) > 0 {
nodeCapacity = node.Status.Allocatable
}
nodeCapacity := nodeCapacity(node, usage)
for resource, value := range usage {
nodeCapacityValue := nodeCapacity[resource]
if resource == v1.ResourceCPU {
@@ -138,7 +149,7 @@ func getNodeThresholdsFromAverageNodeUsage(
highThreshold,
}
}
return nodeThresholds
return nodeThresholds, average
}
func getStaticNodeThresholds(
@@ -216,10 +227,7 @@ func roundTo2Decimals(percentage float64) float64 {
}
func resourceUsagePercentages(nodeUsage api.ReferencedResourceList, node *v1.Node, round bool) api.ResourceThresholds {
nodeCapacity := node.Status.Capacity
if len(node.Status.Allocatable) > 0 {
nodeCapacity = node.Status.Allocatable
}
nodeCapacity := nodeCapacity(node, nodeUsage)
resourceUsagePercentage := api.ResourceThresholds{}
for resourceName, resourceUsage := range nodeUsage {
@@ -395,16 +403,29 @@ func evictPods(
if !preEvictionFilterWithOptions(pod) {
continue
}
// In case podUsage does not support resource counting (e.g. provided metric
// does not quantify pod resource utilization).
unconstrainedResourceEviction := false
podUsage, err := usageClient.podUsage(pod)
if err != nil {
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
continue
if _, ok := err.(*notSupportedError); !ok {
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
continue
}
unconstrainedResourceEviction = true
}
err = podEvictor.Evict(ctx, pod, evictOptions)
if err == nil {
if maxNoOfPodsToEvictPerNode == nil && unconstrainedResourceEviction {
klog.V(3).InfoS("Currently, only a single pod eviction is allowed")
break
}
evictionCounter++
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))
if unconstrainedResourceEviction {
continue
}
for name := range totalAvailableUsage {
if name == v1.ResourcePods {
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))

View File

@@ -55,13 +55,24 @@ type HighNodeUtilizationArgs struct {
}
// MetricsUtilization allow to consume actual resource utilization from metrics
// +k8s:deepcopy-gen=true
type MetricsUtilization struct {
// metricsServer enables metrics from a kubernetes metrics server.
// Please see https://kubernetes-sigs.github.io/metrics-server/ for more.
// Deprecated. Use MetricsSource instead.
// Deprecated. Use Source instead.
MetricsServer bool `json:"metricsServer,omitempty"`
// source enables the plugin to consume metrics from a metrics source.
// Currently only KubernetesMetrics available.
Source api.MetricsSource `json:"source,omitempty"`
// prometheus enables metrics collection through a prometheus query.
Prometheus *Prometheus `json:"prometheus,omitempty"`
}
type Prometheus struct {
// query returning a vector of samples, each sample labeled with `instance`
// corresponding to a node name with each sample value as a real number
// in <0; 1> interval.
Query string `json:"query,omitempty"`
}

View File

@@ -19,7 +19,11 @@ package nodeutilization
import (
"context"
"fmt"
"time"
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -33,11 +37,33 @@ import (
"sigs.k8s.io/descheduler/pkg/utils"
)
type UsageClientType int
const (
requestedUsageClientType UsageClientType = iota
actualUsageClientType
prometheusUsageClientType
)
type notSupportedError struct {
usageClientType UsageClientType
}
func (e notSupportedError) Error() string {
return "maximum number of evicted pods per node reached"
}
func newNotSupportedError(usageClientType UsageClientType) *notSupportedError {
return &notSupportedError{
usageClientType: usageClientType,
}
}
type usageClient interface {
// Both low/high node utilization plugins are expected to invoke sync right
// after Balance method is invoked. There's no cache invalidation so each
// Balance is expected to get the latest data by invoking sync.
sync(nodes []*v1.Node) error
sync(ctx context.Context, nodes []*v1.Node) error
nodeUtilization(node string) api.ReferencedResourceList
pods(node string) []*v1.Pod
podUsage(pod *v1.Pod) (api.ReferencedResourceList, error)
@@ -79,7 +105,7 @@ func (s *requestedUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceList
return usage, nil
}
func (s *requestedUsageClient) sync(nodes []*v1.Node) error {
func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) error {
s._nodeUtilization = make(map[string]api.ReferencedResourceList)
s._pods = make(map[string][]*v1.Pod)
@@ -165,7 +191,7 @@ func (client *actualUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceLi
return totalUsage, nil
}
func (client *actualUsageClient) sync(nodes []*v1.Node) error {
func (client *actualUsageClient) sync(ctx context.Context, nodes []*v1.Node) error {
client._nodeUtilization = make(map[string]api.ReferencedResourceList)
client._pods = make(map[string][]*v1.Pod)
@@ -200,3 +226,95 @@ func (client *actualUsageClient) sync(nodes []*v1.Node) error {
return nil
}
type prometheusUsageClient struct {
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
promClient promapi.Client
promQuery string
_pods map[string][]*v1.Pod
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
}
var _ usageClient = &actualUsageClient{}
func newPrometheusUsageClient(
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
promClient promapi.Client,
promQuery string,
) *prometheusUsageClient {
return &prometheusUsageClient{
getPodsAssignedToNode: getPodsAssignedToNode,
promClient: promClient,
promQuery: promQuery,
}
}
func (client *prometheusUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
return client._nodeUtilization[node]
}
func (client *prometheusUsageClient) pods(node string) []*v1.Pod {
return client._pods[node]
}
func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
return nil, newNotSupportedError(prometheusUsageClientType)
}
func NodeUsageFromPrometheusMetrics(ctx context.Context, promClient promapi.Client, promQuery string) (map[string]map[v1.ResourceName]*resource.Quantity, error) {
results, warnings, err := promv1.NewAPI(promClient).Query(ctx, promQuery, time.Now())
if err != nil {
return nil, fmt.Errorf("unable to capture prometheus metrics: %v", err)
}
if len(warnings) > 0 {
klog.Infof("prometheus metrics warnings: %v", warnings)
}
if results.Type() != model.ValVector {
return nil, fmt.Errorf("expected query results to be of type %q, got %q instead", model.ValVector, results.Type())
}
nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity)
for _, sample := range results.(model.Vector) {
nodeName, exists := sample.Metric["instance"]
if !exists {
return nil, fmt.Errorf("The collected metrics sample is missing 'instance' key")
}
if sample.Value < 0 || sample.Value > 1 {
return nil, fmt.Errorf("The collected metrics sample for %q has value %v outside of <0; 1> interval", string(nodeName), sample.Value)
}
nodeUsages[string(nodeName)] = map[v1.ResourceName]*resource.Quantity{
MetricResource: resource.NewQuantity(int64(sample.Value*100), resource.DecimalSI),
}
}
return nodeUsages, nil
}
func (client *prometheusUsageClient) sync(ctx context.Context, nodes []*v1.Node) error {
client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
client._pods = make(map[string][]*v1.Pod)
nodeUsages, err := NodeUsageFromPrometheusMetrics(ctx, client.promClient, client.promQuery)
if err != nil {
return err
}
for _, node := range nodes {
if _, exists := nodeUsages[node.Name]; !exists {
return fmt.Errorf("unable to find metric entry for %v", node.Name)
}
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
}
// store the snapshot of pods from the same (or the closest) node utilization computation
client._pods[node.Name] = pods
client._nodeUtilization[node.Name] = nodeUsages[node.Name]
}
return nil
}

View File

@@ -18,9 +18,14 @@ package nodeutilization
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"testing"
"github.com/prometheus/common/model"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
@@ -58,9 +63,9 @@ func updateMetricsAndCheckNodeUtilization(
if err != nil {
t.Fatalf("failed to capture metrics: %v", err)
}
err = usageClient.sync(nodes)
err = usageClient.sync(ctx, nodes)
if err != nil {
t.Fatalf("failed to capture a snapshot: %v", err)
t.Fatalf("failed to sync a snapshot: %v", err)
}
nodeUtilization := usageClient.nodeUtilization(nodeName)
t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue())
@@ -137,3 +142,158 @@ func TestActualUsageClient(t *testing.T) {
metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics,
)
}
type fakePromClient struct {
result interface{}
dataType model.ValueType
}
type fakePayload struct {
Status string `json:"status"`
Data queryResult `json:"data"`
}
type queryResult struct {
Type model.ValueType `json:"resultType"`
Result interface{} `json:"result"`
}
func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL {
return &url.URL{}
}
func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) {
jsonData, err := json.Marshal(fakePayload{
Status: "success",
Data: queryResult{
Type: client.dataType,
Result: client.result,
},
})
return &http.Response{StatusCode: 200}, jsonData, err
}
func sample(metricName, nodeName string, value float64) *model.Sample {
return &model.Sample{
Metric: model.Metric{
"__name__": model.LabelValue(metricName),
"instance": model.LabelValue(nodeName),
},
Value: model.SampleValue(value),
Timestamp: 1728991761711,
}
}
func TestPrometheusUsageClient(t *testing.T) {
n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil)
nodes := []*v1.Node{n1, n2, n3}
p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil)
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)
tests := []struct {
name string
result interface{}
dataType model.ValueType
nodeUsage map[string]int64
err error
}{
{
name: "valid data",
dataType: model.ValVector,
result: model.Vector{
sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104),
sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522),
sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561),
},
nodeUsage: map[string]int64{
"ip-10-0-51-101.ec2.internal": 20,
"ip-10-0-17-165.ec2.internal": 42,
"ip-10-0-94-25.ec2.internal": 56,
},
},
{
name: "invalid data missing instance label",
dataType: model.ValVector,
result: model.Vector{
&model.Sample{
Metric: model.Metric{
"__name__": model.LabelValue("instance:node_cpu:rate:sum"),
},
Value: model.SampleValue(0.20381818181818104),
Timestamp: 1728991761711,
},
},
err: fmt.Errorf("The collected metrics sample is missing 'instance' key"),
},
{
name: "invalid data value out of range",
dataType: model.ValVector,
result: model.Vector{
sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 1.20381818181818104),
},
err: fmt.Errorf("The collected metrics sample for \"ip-10-0-51-101.ec2.internal\" has value 1.203818181818181 outside of <0; 1> interval"),
},
{
name: "invalid data not a vector",
dataType: model.ValScalar,
result: model.Scalar{
Value: model.SampleValue(0.20381818181818104),
Timestamp: 1728991761711,
},
err: fmt.Errorf("expected query results to be of type \"vector\", got \"scalar\" instead"),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pClient := &fakePromClient{
result: tc.result,
dataType: tc.dataType,
}
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)
ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum")
err = prometheusUsageClient.sync(ctx, nodes)
if tc.err == nil {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
} else {
if err == nil {
t.Fatalf("unexpected %q error, got nil instead", tc.err)
} else if err.Error() != tc.err.Error() {
t.Fatalf("expected %q error, got %q instead", tc.err, err)
}
return
}
for _, node := range nodes {
nodeUtil := prometheusUsageClient.nodeUtilization(node.Name)
if nodeUtil[MetricResource].Value() != tc.nodeUsage[node.Name] {
t.Fatalf("expected %q node utilization to be %v, got %v instead", node.Name, tc.nodeUsage[node.Name], nodeUtil[MetricResource])
} else {
t.Logf("%v node utilization: %v", node.Name, nodeUtil[MetricResource])
}
}
})
}
}

View File

@@ -44,6 +44,17 @@ func ValidateLowNodeUtilizationArgs(obj runtime.Object) error {
if err != nil {
return err
}
if args.MetricsUtilization != nil {
if args.MetricsUtilization.Source == api.KubernetesMetrics && args.MetricsUtilization.MetricsServer {
return fmt.Errorf("it is not allowed to set both %q source and metricsServer", api.KubernetesMetrics)
}
if args.MetricsUtilization.Source == api.KubernetesMetrics && args.MetricsUtilization.Prometheus != nil {
return fmt.Errorf("prometheus configuration is not allowed to set when source is set to %q", api.KubernetesMetrics)
}
if args.MetricsUtilization.Source == api.PrometheusMetrics && (args.MetricsUtilization.Prometheus == nil || args.MetricsUtilization.Prometheus.Query == "") {
return fmt.Errorf("prometheus query is required when metrics source is set to %q", api.PrometheusMetrics)
}
}
return nil
}

View File

@@ -183,6 +183,65 @@ func TestValidateLowNodeUtilizationPluginConfig(t *testing.T) {
},
errInfo: nil,
},
{
name: "setting both kubernetes metrics source and metricsserver",
args: &LowNodeUtilizationArgs{
Thresholds: api.ResourceThresholds{
v1.ResourceCPU: 20,
v1.ResourceMemory: 20,
extendedResource: 20,
},
TargetThresholds: api.ResourceThresholds{
v1.ResourceCPU: 80,
v1.ResourceMemory: 80,
extendedResource: 80,
},
MetricsUtilization: &MetricsUtilization{
MetricsServer: true,
Source: api.KubernetesMetrics,
},
},
errInfo: fmt.Errorf("it is not allowed to set both \"KubernetesMetrics\" source and metricsServer"),
},
{
name: "missing prometheus query",
args: &LowNodeUtilizationArgs{
Thresholds: api.ResourceThresholds{
v1.ResourceCPU: 20,
v1.ResourceMemory: 20,
extendedResource: 20,
},
TargetThresholds: api.ResourceThresholds{
v1.ResourceCPU: 80,
v1.ResourceMemory: 80,
extendedResource: 80,
},
MetricsUtilization: &MetricsUtilization{
Source: api.PrometheusMetrics,
},
},
errInfo: fmt.Errorf("prometheus query is required when metrics source is set to \"Prometheus\""),
},
{
name: "prometheus set when source set to kubernetes metrics",
args: &LowNodeUtilizationArgs{
Thresholds: api.ResourceThresholds{
v1.ResourceCPU: 20,
v1.ResourceMemory: 20,
extendedResource: 20,
},
TargetThresholds: api.ResourceThresholds{
v1.ResourceCPU: 80,
v1.ResourceMemory: 80,
extendedResource: 80,
},
MetricsUtilization: &MetricsUtilization{
Source: api.KubernetesMetrics,
Prometheus: &Prometheus{},
},
},
errInfo: fmt.Errorf("prometheus configuration is not allowed to set when source is set to \"KubernetesMetrics\""),
},
}
for _, testCase := range tests {
@@ -190,10 +249,10 @@ func TestValidateLowNodeUtilizationPluginConfig(t *testing.T) {
validateErr := ValidateLowNodeUtilizationArgs(runtime.Object(testCase.args))
if validateErr == nil || testCase.errInfo == nil {
if validateErr != testCase.errInfo {
t.Errorf("expected validity of plugin config: %v but got %v instead", testCase.errInfo, validateErr)
t.Errorf("expected validity of plugin config: %q but got %q instead", testCase.errInfo, validateErr)
}
} else if validateErr.Error() != testCase.errInfo.Error() {
t.Errorf("expected validity of plugin config: %v but got %v instead", testCase.errInfo, validateErr)
t.Errorf("expected validity of plugin config: %q but got %q instead", testCase.errInfo, validateErr)
}
})
}

View File

@@ -84,7 +84,7 @@ func (in *LowNodeUtilizationArgs) DeepCopyInto(out *LowNodeUtilizationArgs) {
if in.MetricsUtilization != nil {
in, out := &in.MetricsUtilization, &out.MetricsUtilization
*out = new(MetricsUtilization)
**out = **in
(*in).DeepCopyInto(*out)
}
if in.EvictableNamespaces != nil {
in, out := &in.EvictableNamespaces, &out.EvictableNamespaces
@@ -116,3 +116,24 @@ func (in *LowNodeUtilizationArgs) DeepCopyObject() runtime.Object {
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MetricsUtilization) DeepCopyInto(out *MetricsUtilization) {
*out = *in
if in.Prometheus != nil {
in, out := &in.Prometheus, &out.Prometheus
*out = new(Prometheus)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsUtilization.
func (in *MetricsUtilization) DeepCopy() *MetricsUtilization {
if in == nil {
return nil
}
out := new(MetricsUtilization)
in.DeepCopyInto(out)
return out
}

View File

@@ -18,6 +18,7 @@ import (
"fmt"
"time"
promapi "github.com/prometheus/client_golang/api"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@@ -68,6 +69,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev
// handleImpl implements the framework handle which gets passed to plugins
type handleImpl struct {
clientSet clientset.Interface
prometheusClient promapi.Client
metricsCollector *metricscollector.MetricsCollector
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
@@ -81,6 +83,10 @@ func (hi *handleImpl) ClientSet() clientset.Interface {
return hi.clientSet
}
func (hi *handleImpl) PrometheusClient() promapi.Client {
return hi.prometheusClient
}
func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector {
return hi.metricsCollector
}
@@ -131,6 +137,7 @@ type Option func(*handleImplOpts)
type handleImplOpts struct {
clientSet clientset.Interface
prometheusClient promapi.Client
sharedInformerFactory informers.SharedInformerFactory
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
podEvictor *evictions.PodEvictor
@@ -144,6 +151,13 @@ func WithClientSet(clientSet clientset.Interface) Option {
}
}
// WithPrometheusClient sets Prometheus client for the scheduling frameworkImpl.
func WithPrometheusClient(prometheusClient promapi.Client) Option {
return func(o *handleImplOpts) {
o.prometheusClient = prometheusClient
}
}
func WithSharedInformerFactory(sharedInformerFactory informers.SharedInformerFactory) Option {
return func(o *handleImplOpts) {
o.sharedInformerFactory = sharedInformerFactory
@@ -267,6 +281,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts
podEvictor: hOpts.podEvictor,
},
metricsCollector: hOpts.metricsCollector,
prometheusClient: hOpts.prometheusClient,
}
pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...)

View File

@@ -26,6 +26,8 @@ import (
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
promapi "github.com/prometheus/client_golang/api"
)
// Handle provides handles used by plugins to retrieve a kubernetes client set,
@@ -34,6 +36,7 @@ import (
type Handle interface {
// ClientSet returns a kubernetes clientSet.
ClientSet() clientset.Interface
PrometheusClient() promapi.Client
Evictor() Evictor
GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc
SharedInformerFactory() informers.SharedInformerFactory