From 6567f01e86049f4b562a5fc028f5eeca0c94c22a Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Thu, 7 Nov 2024 16:23:16 +0100 Subject: [PATCH] [nodeutilization]: actual usage client through kubernetes metrics --- README.md | 15 +- cmd/descheduler/app/options/options.go | 3 + kubernetes/base/rbac.yaml | 3 + pkg/api/types.go | 10 + pkg/api/v1alpha2/types.go | 10 + pkg/api/v1alpha2/zz_generated.conversion.go | 36 ++ pkg/api/v1alpha2/zz_generated.deepcopy.go | 17 + pkg/api/zz_generated.deepcopy.go | 17 + pkg/descheduler/client/client.go | 27 +- pkg/descheduler/descheduler.go | 79 ++- pkg/descheduler/descheduler_test.go | 134 ++++- .../metricscollector/metricscollector.go | 151 ++++++ .../metricscollector/metricscollector_test.go | 141 +++++ pkg/framework/fake/fake.go | 6 + .../nodeutilization/lownodeutilization.go | 13 +- .../lownodeutilization_test.go | 481 ++++++++++++++---- .../plugins/nodeutilization/types.go | 14 +- .../plugins/nodeutilization/usageclients.go | 99 ++++ .../nodeutilization/usageclients_test.go | 139 +++++ .../nodeutilization/zz_generated.deepcopy.go | 2 + pkg/framework/profile/profile.go | 30 +- pkg/framework/types/types.go | 2 + test/e2e/e2e_lownodeutilization_test.go | 304 +++++++++++ test/run-e2e-tests.sh | 5 + test/test_utils.go | 39 +- 25 files changed, 1643 insertions(+), 134 deletions(-) create mode 100644 pkg/descheduler/metricscollector/metricscollector.go create mode 100644 pkg/descheduler/metricscollector/metricscollector_test.go create mode 100644 pkg/framework/plugins/nodeutilization/usageclients_test.go create mode 100644 test/e2e/e2e_lownodeutilization_test.go diff --git a/README.md b/README.md index 1ffce0ad5..85cc7509e 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,8 @@ These are top level keys in the Descheduler Policy that you can use to configure | `maxNoOfPodsToEvictPerNode` |`int`| `nil` | maximum number of pods evicted from each node (summed through all strategies) | | `maxNoOfPodsToEvictPerNamespace` |`int`| `nil` | maximum number of pods evicted from each namespace (summed through all strategies) | | `maxNoOfPodsToEvictTotal` |`int`| `nil` | maximum number of pods evicted per rescheduling cycle (summed through all strategies) | +| `metricsCollector` |`object`| `nil` | configures collection of metrics for actual resource utilization | +| `metricsCollector.enabled` |`bool`| `false` | enables kubernetes [metrics server](https://kubernetes-sigs.github.io/metrics-server/) collection | ### Evictor Plugin configuration (Default Evictor) @@ -158,6 +160,8 @@ nodeSelector: "node=node1" # you don't need to set this, if not set all will be maxNoOfPodsToEvictPerNode: 5000 # you don't need to set this, unlimited if not set maxNoOfPodsToEvictPerNamespace: 5000 # you don't need to set this, unlimited if not set maxNoOfPodsToEvictTotal: 5000 # you don't need to set this, unlimited if not set +metricsCollector: + enabled: true # you don't need to set this, metrics are not collected if not set profiles: - name: ProfileName pluginConfig: @@ -277,11 +281,13 @@ If that parameter is set to `true`, the thresholds are considered as percentage `thresholds` will be deducted from the mean among all nodes and `targetThresholds` will be added to the mean. A resource consumption above (resp. below) this window is considered as overutilization (resp. underutilization). -**NOTE:** Node resource consumption is determined by the requests and limits of pods, not actual usage. +**NOTE:** By default node resource consumption is determined by the requests and limits of pods, not actual usage. This approach is chosen in order to maintain consistency with the kube-scheduler, which follows the same design for scheduling pods onto nodes. This means that resource usage as reported by Kubelet (or commands like `kubectl top`) may differ from the calculated consumption, due to these components reporting -actual usage metrics. Implementing metrics-based descheduling is currently TODO for the project. +actual usage metrics. Metrics-based descheduling can be enabled by setting `metricsUtilization.metricsServer` field. +In order to have the plugin consume the metrics the metric collector needs to be configured as well. +See `metricsCollector` field at [Top Level configuration](#top-level-configuration) for available options. **Parameters:** @@ -292,6 +298,9 @@ actual usage metrics. Implementing metrics-based descheduling is currently TODO |`targetThresholds`|map(string:int)| |`numberOfNodes`|int| |`evictableNamespaces`|(see [namespace filtering](#namespace-filtering))| +|`metricsUtilization`|object| +|`metricsUtilization.metricsServer`|bool| + **Example:** @@ -311,6 +320,8 @@ profiles: "cpu" : 50 "memory": 50 "pods": 50 + metricsUtilization: + metricsServer: true plugins: balance: enabled: diff --git a/cmd/descheduler/app/options/options.go b/cmd/descheduler/app/options/options.go index 3821f64c3..3d84c3158 100644 --- a/cmd/descheduler/app/options/options.go +++ b/cmd/descheduler/app/options/options.go @@ -22,6 +22,7 @@ import ( "time" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiserver "k8s.io/apiserver/pkg/server" apiserveroptions "k8s.io/apiserver/pkg/server/options" @@ -33,6 +34,7 @@ import ( componentbaseoptions "k8s.io/component-base/config/options" "k8s.io/component-base/featuregate" "k8s.io/klog/v2" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" "sigs.k8s.io/descheduler/pkg/apis/componentconfig" "sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1" @@ -51,6 +53,7 @@ type DeschedulerServer struct { Client clientset.Interface EventClient clientset.Interface + MetricsClient metricsclient.Interface SecureServing *apiserveroptions.SecureServingOptionsWithLoopback SecureServingInfo *apiserver.SecureServingInfo DisableMetrics bool diff --git a/kubernetes/base/rbac.yaml b/kubernetes/base/rbac.yaml index f38ed3eb6..ab628cb51 100644 --- a/kubernetes/base/rbac.yaml +++ b/kubernetes/base/rbac.yaml @@ -32,6 +32,9 @@ rules: resources: ["leases"] resourceNames: ["descheduler"] verbs: ["get", "patch", "delete"] +- apiGroups: ["metrics.k8s.io"] + resources: ["nodes", "pods"] + verbs: ["get", "list"] --- apiVersion: v1 kind: ServiceAccount diff --git a/pkg/api/types.go b/pkg/api/types.go index 6917e6cdb..f282d0fe3 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -41,6 +41,9 @@ type DeschedulerPolicy struct { // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. MaxNoOfPodsToEvictTotal *uint + + // MetricsCollector configures collection of metrics about actual resource utilization + MetricsCollector MetricsCollector } // Namespaces carries a list of included/excluded namespaces @@ -84,3 +87,10 @@ type PluginSet struct { Enabled []string Disabled []string } + +// MetricsCollector configures collection of metrics about actual resource utilization +type MetricsCollector struct { + // Enabled metrics collection from kubernetes metrics. + // Later, the collection can be extended to other providers. + Enabled bool +} diff --git a/pkg/api/v1alpha2/types.go b/pkg/api/v1alpha2/types.go index d663efb86..702dc16fd 100644 --- a/pkg/api/v1alpha2/types.go +++ b/pkg/api/v1alpha2/types.go @@ -40,6 +40,9 @@ type DeschedulerPolicy struct { // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. MaxNoOfPodsToEvictTotal *uint `json:"maxNoOfPodsToEvictTotal,omitempty"` + + // MetricsCollector configures collection of metrics for actual resource utilization + MetricsCollector MetricsCollector `json:"metricsCollector,omitempty"` } type DeschedulerProfile struct { @@ -66,3 +69,10 @@ type PluginSet struct { Enabled []string `json:"enabled"` Disabled []string `json:"disabled"` } + +// MetricsCollector configures collection of metrics about actual resource utilization +type MetricsCollector struct { + // Enabled metrics collection from kubernetes metrics. + // Later, the collection can be extended to other providers. + Enabled bool `json:"enabled,omitempty"` +} diff --git a/pkg/api/v1alpha2/zz_generated.conversion.go b/pkg/api/v1alpha2/zz_generated.conversion.go index a13151ed2..1b33be25c 100644 --- a/pkg/api/v1alpha2/zz_generated.conversion.go +++ b/pkg/api/v1alpha2/zz_generated.conversion.go @@ -46,6 +46,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*MetricsCollector)(nil), (*api.MetricsCollector)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector(a.(*MetricsCollector), b.(*api.MetricsCollector), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.MetricsCollector)(nil), (*MetricsCollector)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(a.(*api.MetricsCollector), b.(*MetricsCollector), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*api.PluginConfig)(nil), (*PluginConfig)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_api_PluginConfig_To_v1alpha2_PluginConfig(a.(*api.PluginConfig), b.(*PluginConfig), scope) }); err != nil { @@ -105,6 +115,9 @@ func autoConvert_v1alpha2_DeschedulerPolicy_To_api_DeschedulerPolicy(in *Desched out.MaxNoOfPodsToEvictPerNode = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNode)) out.MaxNoOfPodsToEvictPerNamespace = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNamespace)) out.MaxNoOfPodsToEvictTotal = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictTotal)) + if err := Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector(&in.MetricsCollector, &out.MetricsCollector, s); err != nil { + return err + } return nil } @@ -124,6 +137,9 @@ func autoConvert_api_DeschedulerPolicy_To_v1alpha2_DeschedulerPolicy(in *api.Des out.MaxNoOfPodsToEvictPerNode = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNode)) out.MaxNoOfPodsToEvictPerNamespace = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictPerNamespace)) out.MaxNoOfPodsToEvictTotal = (*uint)(unsafe.Pointer(in.MaxNoOfPodsToEvictTotal)) + if err := Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(&in.MetricsCollector, &out.MetricsCollector, s); err != nil { + return err + } return nil } @@ -175,6 +191,26 @@ func Convert_api_DeschedulerProfile_To_v1alpha2_DeschedulerProfile(in *api.Desch return autoConvert_api_DeschedulerProfile_To_v1alpha2_DeschedulerProfile(in, out, s) } +func autoConvert_v1alpha2_MetricsCollector_To_api_MetricsCollector(in *MetricsCollector, out *api.MetricsCollector, s conversion.Scope) error { + out.Enabled = in.Enabled + return nil +} + +// Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector is an autogenerated conversion function. +func Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector(in *MetricsCollector, out *api.MetricsCollector, s conversion.Scope) error { + return autoConvert_v1alpha2_MetricsCollector_To_api_MetricsCollector(in, out, s) +} + +func autoConvert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in *api.MetricsCollector, out *MetricsCollector, s conversion.Scope) error { + out.Enabled = in.Enabled + return nil +} + +// Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector is an autogenerated conversion function. +func Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in *api.MetricsCollector, out *MetricsCollector, s conversion.Scope) error { + return autoConvert_api_MetricsCollector_To_v1alpha2_MetricsCollector(in, out, s) +} + func autoConvert_v1alpha2_PluginConfig_To_api_PluginConfig(in *PluginConfig, out *api.PluginConfig, s conversion.Scope) error { out.Name = in.Name if err := runtime.Convert_runtime_RawExtension_To_runtime_Object(&in.Args, &out.Args, s); err != nil { diff --git a/pkg/api/v1alpha2/zz_generated.deepcopy.go b/pkg/api/v1alpha2/zz_generated.deepcopy.go index a0129fd8e..bfaf19878 100644 --- a/pkg/api/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha2/zz_generated.deepcopy.go @@ -56,6 +56,7 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = new(uint) **out = **in } + out.MetricsCollector = in.MetricsCollector return } @@ -101,6 +102,22 @@ func (in *DeschedulerProfile) DeepCopy() *DeschedulerProfile { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetricsCollector) DeepCopyInto(out *MetricsCollector) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsCollector. +func (in *MetricsCollector) DeepCopy() *MetricsCollector { + if in == nil { + return nil + } + out := new(MetricsCollector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PluginConfig) DeepCopyInto(out *PluginConfig) { *out = *in diff --git a/pkg/api/zz_generated.deepcopy.go b/pkg/api/zz_generated.deepcopy.go index 2f84cf1ed..2ac45bb6a 100644 --- a/pkg/api/zz_generated.deepcopy.go +++ b/pkg/api/zz_generated.deepcopy.go @@ -56,6 +56,7 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = new(uint) **out = **in } + out.MetricsCollector = in.MetricsCollector return } @@ -101,6 +102,22 @@ func (in *DeschedulerProfile) DeepCopy() *DeschedulerProfile { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetricsCollector) DeepCopyInto(out *MetricsCollector) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsCollector. +func (in *MetricsCollector) DeepCopy() *MetricsCollector { + if in == nil { + return nil + } + out := new(MetricsCollector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Namespaces) DeepCopyInto(out *Namespaces) { *out = *in diff --git a/pkg/descheduler/client/client.go b/pkg/descheduler/client/client.go index 964ed3b9b..c5ff5e018 100644 --- a/pkg/descheduler/client/client.go +++ b/pkg/descheduler/client/client.go @@ -19,16 +19,16 @@ package client import ( "fmt" - clientset "k8s.io/client-go/kubernetes" - componentbaseconfig "k8s.io/component-base/config" - // Ensure to load all auth plugins. + clientset "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + componentbaseconfig "k8s.io/component-base/config" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" ) -func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) { +func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) { var cfg *rest.Config if len(clientConnection.Kubeconfig) != 0 { master, err := GetMasterFromKubeconfig(clientConnection.Kubeconfig) @@ -56,9 +56,28 @@ func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfigura cfg = rest.AddUserAgent(cfg, userAgt) } + return cfg, nil +} + +func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) { + cfg, err := createConfig(clientConnection, userAgt) + if err != nil { + return nil, fmt.Errorf("unable to create config: %v", err) + } + return clientset.NewForConfig(cfg) } +func CreateMetricsClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (metricsclient.Interface, error) { + cfg, err := createConfig(clientConnection, userAgt) + if err != nil { + return nil, fmt.Errorf("unable to create config: %v", err) + } + + // Create the metrics clientset to access the metrics.k8s.io API + return metricsclient.NewForConfig(cfg) +} + func GetMasterFromKubeconfig(filename string) (string, error) { config, err := clientcmd.LoadFromFile(filename) if err != nil { diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index a38803706..212f9cdbb 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -23,44 +23,43 @@ import ( "strconv" "time" - policyv1 "k8s.io/api/policy/v1" - schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + + v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" + policyv1 "k8s.io/api/policy/v1" + schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + fakeclientset "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - utilversion "k8s.io/apimachinery/pkg/util/version" - "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - fakeclientset "k8s.io/client-go/kubernetes/fake" - core "k8s.io/client-go/testing" - - "sigs.k8s.io/descheduler/pkg/descheduler/client" - eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" - nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" - "sigs.k8s.io/descheduler/pkg/tracing" - "sigs.k8s.io/descheduler/pkg/utils" - "sigs.k8s.io/descheduler/pkg/version" - "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/metrics" "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/pkg/descheduler/client" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + "sigs.k8s.io/descheduler/pkg/tracing" + "sigs.k8s.io/descheduler/pkg/utils" + "sigs.k8s.io/descheduler/pkg/version" ) type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status @@ -79,6 +78,7 @@ type descheduler struct { eventRecorder events.EventRecorder podEvictor *evictions.PodEvictor podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) + metricsCollector *metricscollector.MetricsCollector } type informerResources struct { @@ -163,6 +163,19 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu return nil, err } + var metricsCollector *metricscollector.MetricsCollector + if deschedulerPolicy.MetricsCollector.Enabled { + nodeSelector := labels.Everything() + if deschedulerPolicy.NodeSelector != nil { + sel, err := labels.Parse(*deschedulerPolicy.NodeSelector) + if err != nil { + return nil, err + } + nodeSelector = sel + } + metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector) + } + return &descheduler{ rs: rs, ir: ir, @@ -172,6 +185,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu eventRecorder: eventRecorder, podEvictor: podEvictor, podEvictionReactionFnc: podEvictionReactionFnc, + metricsCollector: metricsCollector, }, nil } @@ -251,6 +265,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), + frameworkprofile.WithMetricsCollector(d.metricsCollector), ) if err != nil { klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) @@ -315,6 +330,14 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error { return err } + if deschedulerPolicy.MetricsCollector.Enabled { + metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler") + if err != nil { + return err + } + rs.MetricsClient = metricsClient + } + runFn := func() error { return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion) } @@ -423,6 +446,20 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer sharedInformerFactory.WaitForCacheSync(ctx.Done()) descheduler.podEvictor.WaitForEventHandlersSync(ctx) + if deschedulerPolicy.MetricsCollector.Enabled { + go func() { + klog.V(2).Infof("Starting metrics collector") + descheduler.metricsCollector.Run(ctx) + klog.V(2).Infof("Stopped metrics collector") + }() + klog.V(2).Infof("Waiting for metrics collector to sync") + if err := wait.PollWithContext(ctx, time.Second, time.Minute, func(context.Context) (done bool, err error) { + return descheduler.metricsCollector.HasSynced(), nil + }); err != nil { + return fmt.Errorf("unable to wait for metrics collector to sync: %v", err) + } + } + wait.NonSlidingUntil(func() { // A next context is created here intentionally to avoid nesting the spans via context. sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil") diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 22f8f7cb3..1591444b6 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -14,6 +14,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" apiversion "k8s.io/apimachinery/pkg/version" fakediscovery "k8s.io/client-go/discovery/fake" "k8s.io/client-go/informers" @@ -21,13 +22,18 @@ import ( core "k8s.io/client-go/testing" "k8s.io/component-base/featuregate" "k8s.io/klog/v2" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" utilptr "k8s.io/utils/ptr" + "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/features" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + "sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization" "sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates" "sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints" "sigs.k8s.io/descheduler/pkg/utils" @@ -45,6 +51,8 @@ var ( Message: "admission webhook \"virt-launcher-eviction-interceptor.kubevirt.io\" denied the request: Eviction triggered evacuation of VMI", }, } + nodesgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"} + podsgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"} ) func initFeatureGates() featuregate.FeatureGate { @@ -60,6 +68,7 @@ func initPluginRegistry() { pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry) pluginregistry.Register(defaultevictor.PluginName, defaultevictor.New, &defaultevictor.DefaultEvictor{}, &defaultevictor.DefaultEvictorArgs{}, defaultevictor.ValidateDefaultEvictorArgs, defaultevictor.SetDefaults_DefaultEvictorArgs, pluginregistry.PluginRegistry) pluginregistry.Register(removepodsviolatingnodetaints.PluginName, removepodsviolatingnodetaints.New, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaints{}, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaintsArgs{}, removepodsviolatingnodetaints.ValidateRemovePodsViolatingNodeTaintsArgs, removepodsviolatingnodetaints.SetDefaults_RemovePodsViolatingNodeTaintsArgs, pluginregistry.PluginRegistry) + pluginregistry.Register(nodeutilization.LowNodeUtilizationPluginName, nodeutilization.NewLowNodeUtilization, &nodeutilization.LowNodeUtilization{}, &nodeutilization.LowNodeUtilizationArgs{}, nodeutilization.ValidateLowNodeUtilizationArgs, nodeutilization.SetDefaults_LowNodeUtilizationArgs, pluginregistry.PluginRegistry) } func removePodsViolatingNodeTaintsPolicy() *api.DeschedulerPolicy { @@ -126,7 +135,45 @@ func removeDuplicatesPolicy() *api.DeschedulerPolicy { } } -func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { +func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThresholds, metricsEnabled bool) *api.DeschedulerPolicy { + return &api.DeschedulerPolicy{ + Profiles: []api.DeschedulerProfile{ + { + Name: "Profile", + PluginConfigs: []api.PluginConfig{ + { + Name: nodeutilization.LowNodeUtilizationPluginName, + Args: &nodeutilization.LowNodeUtilizationArgs{ + Thresholds: thresholds, + TargetThresholds: targetThresholds, + MetricsUtilization: nodeutilization.MetricsUtilization{ + MetricsServer: metricsEnabled, + }, + }, + }, + { + Name: defaultevictor.PluginName, + Args: &defaultevictor.DefaultEvictorArgs{}, + }, + }, + Plugins: api.Plugins{ + Filter: api.PluginSet{ + Enabled: []string{ + defaultevictor.PluginName, + }, + }, + Balance: api.PluginSet{ + Enabled: []string{ + nodeutilization.LowNodeUtilizationPluginName, + }, + }, + }, + }, + }, + } +} + +func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { client := fakeclientset.NewSimpleClientset(objects...) eventClient := fakeclientset.NewSimpleClientset(objects...) @@ -137,6 +184,7 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate rs.Client = client rs.EventClient = eventClient rs.DefaultFeatureGates = featureGates + rs.MetricsClient = metricsClient sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) @@ -441,7 +489,7 @@ func TestPodEvictorReset(t *testing.T) { internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy() ctxCancel, cancel := context.WithCancel(ctx) - rs, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, node1, node2, p1, p2) + rs, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, node1, node2, p1, p2) defer cancel() var evictedPods []string @@ -543,7 +591,7 @@ func TestEvictionRequestsCache(t *testing.T) { featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha}, }) - _, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, node1, node2, p1, p2, p3, p4) + _, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, node1, node2, p1, p2, p3, p4) defer cancel() var fakeEvictedPods []string @@ -685,7 +733,7 @@ func TestDeschedulingLimits(t *testing.T) { featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha}, }) - _, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, node1, node2) + _, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, node1, node2) defer cancel() var fakeEvictedPods []string @@ -737,3 +785,81 @@ func TestDeschedulingLimits(t *testing.T) { }) } } + +func TestLoadAwareDescheduling(t *testing.T) { + initPluginRegistry() + + ownerRef1 := test.GetReplicaSetOwnerRefList() + updatePod := func(pod *v1.Pod) { + pod.ObjectMeta.OwnerReferences = ownerRef1 + } + + ctx := context.Background() + node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule) + node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + nodes := []*v1.Node{node1, node2} + + p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod) + p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod) + p3 := test.BuildTestPod("p3", 300, 0, node1.Name, updatePod) + p4 := test.BuildTestPod("p4", 300, 0, node1.Name, updatePod) + p5 := test.BuildTestPod("p5", 300, 0, node1.Name, updatePod) + + nodemetricses := []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics("n1", 2400, 3000), + test.BuildNodeMetrics("n2", 400, 0), + } + + podmetricses := []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 400, 0), + test.BuildPodMetrics("p2", 400, 0), + test.BuildPodMetrics("p3", 400, 0), + test.BuildPodMetrics("p4", 400, 0), + test.BuildPodMetrics("p5", 400, 0), + } + + metricsClientset := fakemetricsclient.NewSimpleClientset() + for _, nodemetrics := range nodemetricses { + metricsClientset.Tracker().Create(nodesgvr, nodemetrics, "") + } + for _, podmetrics := range podmetricses { + metricsClientset.Tracker().Create(podsgvr, podmetrics, podmetrics.Namespace) + } + + policy := lowNodeUtilizationPolicy( + api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + true, // enabled metrics utilization + ) + policy.MetricsCollector.Enabled = true + + ctxCancel, cancel := context.WithCancel(ctx) + _, descheduler, _ := initDescheduler( + t, + ctxCancel, + initFeatureGates(), + policy, + metricsClientset, + node1, node2, p1, p2, p3, p4, p5) + defer cancel() + + // This needs to be run since the metrics collector is started + // after newDescheduler in RunDeschedulerStrategies. + descheduler.metricsCollector.Collect(ctx) + + err := descheduler.runDeschedulerLoop(ctx, nodes) + if err != nil { + t.Fatalf("Unable to run a descheduling loop: %v", err) + } + totalEs := descheduler.podEvictor.TotalEvicted() + if totalEs != 2 { + t.Fatalf("Expected %v evictions in total, got %v instead", 2, totalEs) + } + t.Logf("Total evictions: %v", totalEs) +} diff --git a/pkg/descheduler/metricscollector/metricscollector.go b/pkg/descheduler/metricscollector/metricscollector.go new file mode 100644 index 000000000..5c49cec11 --- /dev/null +++ b/pkg/descheduler/metricscollector/metricscollector.go @@ -0,0 +1,151 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricscollector + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + listercorev1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" + utilptr "k8s.io/utils/ptr" +) + +const ( + beta float64 = 0.9 +) + +type MetricsCollector struct { + nodeLister listercorev1.NodeLister + metricsClientset metricsclient.Interface + nodeSelector labels.Selector + + nodes map[string]map[v1.ResourceName]*resource.Quantity + + mu sync.RWMutex + // hasSynced signals at least one sync succeeded + hasSynced bool +} + +func NewMetricsCollector(nodeLister listercorev1.NodeLister, metricsClientset metricsclient.Interface, nodeSelector labels.Selector) *MetricsCollector { + return &MetricsCollector{ + nodeLister: nodeLister, + metricsClientset: metricsClientset, + nodeSelector: nodeSelector, + nodes: make(map[string]map[v1.ResourceName]*resource.Quantity), + } +} + +func (mc *MetricsCollector) Run(ctx context.Context) { + wait.NonSlidingUntil(func() { + mc.Collect(ctx) + }, 5*time.Second, ctx.Done()) +} + +// During experiments rounding to int error causes weightedAverage to never +// reach value even when weightedAverage is repeated many times in a row. +// The difference between the limit and computed average stops within 5 units. +// Nevertheless, the value is expected to change in time. So the weighted +// average nevers gets a chance to converge. Which makes the computed +// error negligible. +// The speed of convergence depends on how often the metrics collector +// syncs with the current value. Currently, the interval is set to 5s. +func weightedAverage(prevValue, value int64) int64 { + return int64(math.Round(beta*float64(prevValue) + (1-beta)*float64(value))) +} + +func (mc *MetricsCollector) AllNodesUsage() (map[string]map[v1.ResourceName]*resource.Quantity, error) { + mc.mu.RLock() + defer mc.mu.RUnlock() + + allNodesUsage := make(map[string]map[v1.ResourceName]*resource.Quantity) + for nodeName := range mc.nodes { + allNodesUsage[nodeName] = map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[nodeName][v1.ResourceCPU].DeepCopy()), + v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[nodeName][v1.ResourceMemory].DeepCopy()), + } + } + + return allNodesUsage, nil +} + +func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resource.Quantity, error) { + mc.mu.RLock() + defer mc.mu.RUnlock() + + if _, exists := mc.nodes[node.Name]; !exists { + klog.V(4).InfoS("unable to find node in the collected metrics", "node", klog.KObj(node)) + return nil, fmt.Errorf("unable to find node %q in the collected metrics", node.Name) + } + return map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceCPU].DeepCopy()), + v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceMemory].DeepCopy()), + }, nil +} + +func (mc *MetricsCollector) HasSynced() bool { + return mc.hasSynced +} + +func (mc *MetricsCollector) MetricsClient() metricsclient.Interface { + return mc.metricsClientset +} + +func (mc *MetricsCollector) Collect(ctx context.Context) error { + mc.mu.Lock() + defer mc.mu.Unlock() + nodes, err := mc.nodeLister.List(mc.nodeSelector) + if err != nil { + return fmt.Errorf("unable to list nodes: %v", err) + } + + for _, node := range nodes { + metrics, err := mc.metricsClientset.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{}) + if err != nil { + klog.ErrorS(err, "Error fetching metrics", "node", node.Name) + // No entry -> duplicate the previous value -> do nothing as beta*PV + (1-beta)*PV = PV + continue + } + + if _, exists := mc.nodes[node.Name]; !exists { + mc.nodes[node.Name] = map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: utilptr.To[resource.Quantity](metrics.Usage.Cpu().DeepCopy()), + v1.ResourceMemory: utilptr.To[resource.Quantity](metrics.Usage.Memory().DeepCopy()), + } + } else { + // get MilliValue to reduce loss of precision + mc.nodes[node.Name][v1.ResourceCPU].SetMilli( + weightedAverage(mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), metrics.Usage.Cpu().MilliValue()), + ) + mc.nodes[node.Name][v1.ResourceMemory].Set( + weightedAverage(mc.nodes[node.Name][v1.ResourceMemory].Value(), metrics.Usage.Memory().Value()), + ) + } + } + + mc.hasSynced = true + return nil +} diff --git a/pkg/descheduler/metricscollector/metricscollector_test.go b/pkg/descheduler/metricscollector/metricscollector_test.go new file mode 100644 index 000000000..c1477172e --- /dev/null +++ b/pkg/descheduler/metricscollector/metricscollector_test.go @@ -0,0 +1,141 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricscollector + +import ( + "context" + "math" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + fakeclientset "k8s.io/client-go/kubernetes/fake" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" + + "sigs.k8s.io/descheduler/test" +) + +func checkCpuNodeUsage(t *testing.T, usage map[v1.ResourceName]*resource.Quantity, millicpu int64) { + t.Logf("current node cpu usage: %v\n", usage[v1.ResourceCPU].MilliValue()) + if usage[v1.ResourceCPU].MilliValue() != millicpu { + t.Fatalf("cpu node usage expected to be %v, got %v instead", millicpu, usage[v1.ResourceCPU].MilliValue()) + } +} + +func TestMetricsCollector(t *testing.T) { + gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"} + + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil) + + n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816) + n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816) + n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816) + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3) + metricsClientset := fakemetricsclient.NewSimpleClientset() + metricsClientset.Tracker().Create(gvr, n1metrics, "") + metricsClientset.Tracker().Create(gvr, n2metrics, "") + metricsClientset.Tracker().Create(gvr, n3metrics, "") + + ctx := context.TODO() + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + t.Logf("Set initial node cpu usage to 1400") + collector := NewMetricsCollector(nodeLister, metricsClientset, labels.Everything()) + collector.Collect(context.TODO()) + nodesUsage, _ := collector.NodeUsage(n2) + checkCpuNodeUsage(t, nodesUsage, 1400) + allnodesUsage, _ := collector.AllNodesUsage() + checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1400) + + t.Logf("Set current node cpu usage to 500") + n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(500, resource.DecimalSI) + metricsClientset.Tracker().Update(gvr, n2metrics, "") + collector.Collect(context.TODO()) + nodesUsage, _ = collector.NodeUsage(n2) + checkCpuNodeUsage(t, nodesUsage, 1310) + allnodesUsage, _ = collector.AllNodesUsage() + checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1310) + + t.Logf("Set current node cpu usage to 900") + n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI) + metricsClientset.Tracker().Update(gvr, n2metrics, "") + collector.Collect(context.TODO()) + nodesUsage, _ = collector.NodeUsage(n2) + checkCpuNodeUsage(t, nodesUsage, 1269) + allnodesUsage, _ = collector.AllNodesUsage() + checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1269) +} + +func TestMetricsCollectorConvergence(t *testing.T) { + gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"} + + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil) + + n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816) + n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816) + n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816) + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3) + metricsClientset := fakemetricsclient.NewSimpleClientset() + metricsClientset.Tracker().Create(gvr, n1metrics, "") + metricsClientset.Tracker().Create(gvr, n2metrics, "") + metricsClientset.Tracker().Create(gvr, n3metrics, "") + + ctx := context.TODO() + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + t.Logf("Set initial node cpu usage to 1400") + collector := NewMetricsCollector(nodeLister, metricsClientset, labels.Everything()) + collector.Collect(context.TODO()) + nodesUsage, _ := collector.NodeUsage(n2) + checkCpuNodeUsage(t, nodesUsage, 1400) + allnodesUsage, _ := collector.AllNodesUsage() + checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1400) + + t.Logf("Set current node cpu/memory usage to 900/1614978816 and wait until it converges to it") + n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI) + n2metrics.Usage[v1.ResourceMemory] = *resource.NewQuantity(1614978816, resource.BinarySI) + metricsClientset.Tracker().Update(gvr, n2metrics, "") + converged := false + for i := 0; i < 300; i++ { + collector.Collect(context.TODO()) + nodesUsage, _ = collector.NodeUsage(n2) + if math.Abs(float64(900-nodesUsage[v1.ResourceCPU].MilliValue())) < 6 && math.Abs(float64(1614978816-nodesUsage[v1.ResourceMemory].Value())) < 6 { + t.Logf("Node cpu/memory usage converged to 900+-5/1614978816+-5") + converged = true + break + } + t.Logf("The current node usage: cpu=%v, memory=%v", nodesUsage[v1.ResourceCPU].MilliValue(), nodesUsage[v1.ResourceMemory].Value()) + } + if !converged { + t.Fatalf("The node usage did not converged to 900+-1") + } +} diff --git a/pkg/framework/fake/fake.go b/pkg/framework/fake/fake.go index a16132d40..d27422893 100644 --- a/pkg/framework/fake/fake.go +++ b/pkg/framework/fake/fake.go @@ -8,6 +8,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" ) @@ -18,6 +19,7 @@ type HandleImpl struct { SharedInformerFactoryImpl informers.SharedInformerFactory EvictorFilterImpl frameworktypes.EvictorPlugin PodEvictorImpl *evictions.PodEvictor + MetricsCollectorImpl *metricscollector.MetricsCollector } var _ frameworktypes.Handle = &HandleImpl{} @@ -26,6 +28,10 @@ func (hi *HandleImpl) ClientSet() clientset.Interface { return hi.ClientsetImpl } +func (hi *HandleImpl) MetricsCollector() *metricscollector.MetricsCollector { + return hi.MetricsCollectorImpl +} + func (hi *HandleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc { return hi.GetPodsAssignedToNodeFuncImpl } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index df3b84743..8abb48465 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" + "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" @@ -88,6 +89,16 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds) + var usageClient usageClient + if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer { + if handle.MetricsCollector() == nil { + return nil, fmt.Errorf("metrics client not initialized") + } + usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) + } else { + usageClient = newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()) + } + return &LowNodeUtilization{ handle: handle, args: lowNodeUtilizationArgsArgs, @@ -95,7 +106,7 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f overutilizationCriteria: overutilizationCriteria, resourceNames: resourceNames, podFilter: podFilter, - usageClient: newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()), + usageClient: usageClient, }, nil } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go index a99acc7b6..7a8c441ce 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go @@ -21,19 +21,23 @@ import ( "fmt" "testing" - "sigs.k8s.io/descheduler/pkg/api" - "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" - frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing" - frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" - v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" + "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing" + frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" "sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/test" ) @@ -48,14 +52,17 @@ func TestLowNodeUtilization(t *testing.T) { notMatchingNodeSelectorValue := "east" testCases := []struct { - name string - useDeviationThresholds bool - thresholds, targetThresholds api.ResourceThresholds - nodes []*v1.Node - pods []*v1.Pod - expectedPodsEvicted uint - evictedPods []string - evictableNamespaces *api.Namespaces + name string + useDeviationThresholds bool + thresholds, targetThresholds api.ResourceThresholds + nodes []*v1.Node + pods []*v1.Pod + nodemetricses []*v1beta1.NodeMetrics + podmetricses []*v1beta1.PodMetrics + expectedPodsEvicted uint + expectedPodsWithMetricsEvicted uint + evictedPods []string + evictableNamespaces *api.Namespaces }{ { name: "no evictable pods", @@ -103,7 +110,20 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 0, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 2401, 1714978816), + test.BuildNodeMetrics(n2NodeName, 401, 1714978816), + test.BuildNodeMetrics(n3NodeName, 10, 1714978816), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities", @@ -153,7 +173,20 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 4, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, }, { name: "without priorities, but excluding namespaces", @@ -218,12 +251,25 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, evictableNamespaces: &api.Namespaces{ Exclude: []string{ "namespace1", }, }, - expectedPodsEvicted: 0, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities, but include only default namespace", @@ -283,12 +329,25 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, evictableNamespaces: &api.Namespaces{ Include: []string{ "default", }, }, - expectedPodsEvicted: 2, + expectedPodsEvicted: 2, + expectedPodsWithMetricsEvicted: 2, }, { name: "without priorities stop when cpu capacity is depleted", @@ -306,14 +365,14 @@ func TestLowNodeUtilization(t *testing.T) { test.BuildTestNode(n3NodeName, 4000, 3000, 10, test.SetNodeUnschedulable), }, pods: []*v1.Pod{ - test.BuildTestPod("p1", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p2", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p3", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p4", 400, 300, n1NodeName, test.SetRSOwnerRef), - test.BuildTestPod("p5", 400, 300, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), // These won't be evicted. - test.BuildTestPod("p6", 400, 300, n1NodeName, test.SetDSOwnerRef), - test.BuildTestPod("p7", 400, 300, n1NodeName, func(pod *v1.Pod) { + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) { // A pod with local storage. test.SetNormalOwnerRef(pod) pod.Spec.Volumes = []v1.Volume{ @@ -330,17 +389,29 @@ func TestLowNodeUtilization(t *testing.T) { // A Mirror Pod. pod.Annotations = test.GetMirrorPodAnnotation() }), - test.BuildTestPod("p8", 400, 300, n1NodeName, func(pod *v1.Pod) { + test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) { // A Critical Pod. test.SetNormalOwnerRef(pod) pod.Namespace = "kube-system" priority := utils.SystemCriticalPriority pod.Spec.Priority = &priority }), - test.BuildTestPod("p9", 400, 2100, n2NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - // 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before cpu is depleted - expectedPodsEvicted: 3, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 0, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, }, { name: "with priorities", @@ -410,7 +481,20 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 4, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, }, { name: "without priorities evicting best-effort pods only", @@ -478,8 +562,21 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 4, - evictedPods: []string{"p1", "p2", "p4", "p5"}, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 4, + expectedPodsWithMetricsEvicted: 4, + evictedPods: []string{"p1", "p2", "p4", "p5"}, }, { name: "with extended resource", @@ -558,8 +655,21 @@ func TestLowNodeUtilization(t *testing.T) { test.SetPodExtendedResourceRequest(pod, extendedResource, 1) }), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, // 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before extended resource is depleted - expectedPodsEvicted: 3, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 0, }, { name: "with extended resource in some of nodes", @@ -586,8 +696,21 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, // 0 pods available for eviction because there's no enough extended resource in node2 - expectedPodsEvicted: 0, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities, but only other node is unschedulable", @@ -636,7 +759,19 @@ func TestLowNodeUtilization(t *testing.T) { pod.Spec.Priority = &priority }), }, - expectedPodsEvicted: 0, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 0, + expectedPodsWithMetricsEvicted: 0, }, { name: "without priorities, but only other node doesn't match pod node selector for p4 and p5", @@ -701,7 +836,17 @@ func TestLowNodeUtilization(t *testing.T) { pod.Spec.Priority = &priority }), }, - expectedPodsEvicted: 3, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + }, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 3, }, { name: "without priorities, but only other node doesn't match pod node affinity for p4 and p5", @@ -795,7 +940,17 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 3, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + }, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 3, }, { name: "deviation thresholds", @@ -847,71 +1002,219 @@ func TestLowNodeUtilization(t *testing.T) { }), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), }, - expectedPodsEvicted: 2, - evictedPods: []string{}, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + test.BuildNodeMetrics(n3NodeName, 11, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 401, 0), + test.BuildPodMetrics("p2", 401, 0), + test.BuildPodMetrics("p3", 401, 0), + test.BuildPodMetrics("p4", 401, 0), + test.BuildPodMetrics("p5", 401, 0), + }, + expectedPodsEvicted: 2, + expectedPodsWithMetricsEvicted: 2, + evictedPods: []string{}, + }, + { + name: "without priorities different evictions for requested and actual resources", + thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + targetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeSelectorKey: notMatchingNodeSelectorValue, + } + }), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p4", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A pod with affinity to run in the "west" datacenter upon scheduling + test.SetNormalOwnerRef(pod) + pod.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: nodeSelectorKey, + Operator: "In", + Values: []string{nodeSelectorValue}, + }, + }, + }, + }, + }, + }, + } + }), + test.BuildTestPod("p5", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A pod with affinity to run in the "west" datacenter upon scheduling + test.SetNormalOwnerRef(pod) + pod.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: nodeSelectorKey, + Operator: "In", + Values: []string{nodeSelectorValue}, + }, + }, + }, + }, + }, + }, + } + }), + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A pod with local storage. + test.SetNormalOwnerRef(pod) + pod.Spec.Volumes = []v1.Volume{ + { + Name: "sample", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{Path: "somePath"}, + EmptyDir: &v1.EmptyDirVolumeSource{ + SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI), + }, + }, + }, + } + // A Mirror Pod. + pod.Annotations = test.GetMirrorPodAnnotation() + }), + test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) { + // A Critical Pod. + test.SetNormalOwnerRef(pod) + pod.Namespace = "kube-system" + priority := utils.SystemCriticalPriority + pod.Spec.Priority = &priority + }), + test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), + }, + nodemetricses: []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics(n1NodeName, 3201, 0), + test.BuildNodeMetrics(n2NodeName, 401, 0), + }, + podmetricses: []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 801, 0), + test.BuildPodMetrics("p2", 801, 0), + test.BuildPodMetrics("p3", 801, 0), + }, + expectedPodsEvicted: 3, + expectedPodsWithMetricsEvicted: 2, }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - var objs []runtime.Object - for _, node := range tc.nodes { - objs = append(objs, node) - } - for _, pod := range tc.pods { - objs = append(objs, pod) - } - fakeClient := fake.NewSimpleClientset(objs...) + var objs []runtime.Object + for _, node := range tc.nodes { + objs = append(objs, node) + } + for _, pod := range tc.pods { + objs = append(objs, pod) + } - podsForEviction := make(map[string]struct{}) - for _, pod := range tc.evictedPods { - podsForEviction[pod] = struct{}{} - } + fakeClient := fake.NewSimpleClientset(objs...) - evictionFailed := false - if len(tc.evictedPods) > 0 { - fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { - getAction := action.(core.CreateAction) - obj := getAction.GetObject() - if eviction, ok := obj.(*policy.Eviction); ok { - if _, exists := podsForEviction[eviction.Name]; exists { - return true, obj, nil - } - evictionFailed = true - return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) + var collector *metricscollector.MetricsCollector + if metricsEnabled { + metricsClientset := fakemetricsclient.NewSimpleClientset() + for _, nodemetrics := range tc.nodemetricses { + metricsClientset.Tracker().Create(nodesgvr, nodemetrics, "") + } + for _, podmetrics := range tc.podmetricses { + metricsClientset.Tracker().Create(podsgvr, podmetrics, podmetrics.Namespace) } - return true, obj, nil - }) - } - handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) - if err != nil { - t.Fatalf("Unable to initialize a framework handle: %v", err) - } + sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) - plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ - Thresholds: tc.thresholds, - TargetThresholds: tc.targetThresholds, - UseDeviationThresholds: tc.useDeviationThresholds, - EvictableNamespaces: tc.evictableNamespaces, - }, - handle) - if err != nil { - t.Fatalf("Unable to initialize the plugin: %v", err) - } - plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) + collector = metricscollector.NewMetricsCollector(nodeLister, metricsClientset, labels.Everything()) + err := collector.Collect(ctx) + if err != nil { + t.Fatalf("unable to collect metrics: %v", err) + } + } - podsEvicted := podEvictor.TotalEvicted() - if tc.expectedPodsEvicted != podsEvicted { - t.Errorf("Expected %v pods to be evicted but %v got evicted", tc.expectedPodsEvicted, podsEvicted) + podsForEviction := make(map[string]struct{}) + for _, pod := range tc.evictedPods { + podsForEviction[pod] = struct{}{} + } + + evictionFailed := false + if len(tc.evictedPods) > 0 { + fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.CreateAction) + obj := getAction.GetObject() + if eviction, ok := obj.(*policy.Eviction); ok { + if _, exists := podsForEviction[eviction.Name]; exists { + return true, obj, nil + } + evictionFailed = true + return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) + } + return true, obj, nil + }) + } + + handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) + if err != nil { + t.Fatalf("Unable to initialize a framework handle: %v", err) + } + handle.MetricsCollectorImpl = collector + + plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ + Thresholds: tc.thresholds, + TargetThresholds: tc.targetThresholds, + UseDeviationThresholds: tc.useDeviationThresholds, + EvictableNamespaces: tc.evictableNamespaces, + MetricsUtilization: MetricsUtilization{ + MetricsServer: metricsEnabled, + }, + }, + handle) + if err != nil { + t.Fatalf("Unable to initialize the plugin: %v", err) + } + plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) + + podsEvicted := podEvictor.TotalEvicted() + if expectedPodsEvicted != podsEvicted { + t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted) + } + if evictionFailed { + t.Errorf("Pod evictions failed unexpectedly") + } } - if evictionFailed { - t.Errorf("Pod evictions failed unexpectedly") - } - }) + } + t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted)) + t.Run(tc.name+" with metrics enabled", testFnc(true, tc.expectedPodsWithMetricsEvicted)) } } diff --git a/pkg/framework/plugins/nodeutilization/types.go b/pkg/framework/plugins/nodeutilization/types.go index 4823b0695..8e005fa02 100644 --- a/pkg/framework/plugins/nodeutilization/types.go +++ b/pkg/framework/plugins/nodeutilization/types.go @@ -28,6 +28,7 @@ type LowNodeUtilizationArgs struct { Thresholds api.ResourceThresholds `json:"thresholds"` TargetThresholds api.ResourceThresholds `json:"targetThresholds"` NumberOfNodes int `json:"numberOfNodes,omitempty"` + MetricsUtilization MetricsUtilization `json:"metricsUtilization,omitempty"` // Naming this one differently since namespaces are still // considered while considering resources used by pods @@ -41,10 +42,19 @@ type LowNodeUtilizationArgs struct { type HighNodeUtilizationArgs struct { metav1.TypeMeta `json:",inline"` - Thresholds api.ResourceThresholds `json:"thresholds"` - NumberOfNodes int `json:"numberOfNodes,omitempty"` + Thresholds api.ResourceThresholds `json:"thresholds"` + NumberOfNodes int `json:"numberOfNodes,omitempty"` + MetricsUtilization MetricsUtilization `json:"metricsUtilization,omitempty"` + // Naming this one differently since namespaces are still // considered while considering resources used by pods // but then filtered out before eviction EvictableNamespaces *api.Namespaces `json:"evictableNamespaces,omitempty"` } + +// MetricsUtilization allow to consume actual resource utilization from metrics +type MetricsUtilization struct { + // metricsServer enables metrics from a kubernetes metrics server. + // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. + MetricsServer bool `json:"metricsServer,omitempty"` +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 608523897..01d1cdce4 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -17,12 +17,16 @@ limitations under the License. package nodeutilization import ( + "context" "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" utilptr "k8s.io/utils/ptr" + + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/utils" @@ -100,3 +104,98 @@ func (s *requestedUsageClient) sync(nodes []*v1.Node) error { return nil } + +type actualUsageClient struct { + resourceNames []v1.ResourceName + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + metricsCollector *metricscollector.MetricsCollector + + _pods map[string][]*v1.Pod + _nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity +} + +var _ usageClient = &actualUsageClient{} + +func newActualUsageClient( + resourceNames []v1.ResourceName, + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, + metricsCollector *metricscollector.MetricsCollector, +) *actualUsageClient { + return &actualUsageClient{ + resourceNames: resourceNames, + getPodsAssignedToNode: getPodsAssignedToNode, + metricsCollector: metricsCollector, + } +} + +func (client *actualUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity { + return client._nodeUtilization[node] +} + +func (client *actualUsageClient) pods(node string) []*v1.Pod { + return client._pods[node] +} + +func (client *actualUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + // It's not efficient to keep track of all pods in a cluster when only their fractions is evicted. + // Thus, take the current pod metrics without computing any softening (like e.g. EWMA). + podMetrics, err := client.metricsCollector.MetricsClient().MetricsV1beta1().PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("unable to get podmetrics for %q/%q: %v", pod.Namespace, pod.Name, err) + } + + totalUsage := make(map[v1.ResourceName]*resource.Quantity) + for _, container := range podMetrics.Containers { + for _, resourceName := range client.resourceNames { + if resourceName == v1.ResourcePods { + continue + } + if _, exists := container.Usage[resourceName]; !exists { + return nil, fmt.Errorf("pod %v/%v: container %q is missing %q resource", pod.Namespace, pod.Name, container.Name, resourceName) + } + if totalUsage[resourceName] == nil { + totalUsage[resourceName] = utilptr.To[resource.Quantity](container.Usage[resourceName].DeepCopy()) + } else { + totalUsage[resourceName].Add(container.Usage[resourceName]) + } + } + } + + return totalUsage, nil +} + +func (client *actualUsageClient) sync(nodes []*v1.Node) error { + client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) + client._pods = make(map[string][]*v1.Pod) + + nodesUsage, err := client.metricsCollector.AllNodesUsage() + if err != nil { + return err + } + + for _, node := range nodes { + pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) + if err != nil { + klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) + return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + } + + nodeUsage, ok := nodesUsage[node.Name] + if !ok { + return fmt.Errorf("unable to find node %q in the collected metrics", node.Name) + } + nodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI) + + for _, resourceName := range client.resourceNames { + if _, exists := nodeUsage[resourceName]; !exists { + return fmt.Errorf("unable to find %q resource for collected %q node metric", resourceName, node.Name) + } + } + + // store the snapshot of pods from the same (or the closest) node utilization computation + client._pods[node.Name] = pods + client._nodeUtilization[node.Name] = nodeUsage + } + + return nil +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go new file mode 100644 index 000000000..ac1886216 --- /dev/null +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeutilization + +import ( + "context" + "fmt" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" + + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/test" +) + +var ( + nodesgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"} + podsgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"} +) + +func updateMetricsAndCheckNodeUtilization( + t *testing.T, + ctx context.Context, + newValue, expectedValue int64, + metricsClientset *fakemetricsclient.Clientset, + collector *metricscollector.MetricsCollector, + usageClient usageClient, + nodes []*v1.Node, + nodeName string, + nodemetrics *v1beta1.NodeMetrics, +) { + t.Logf("Set current node cpu usage to %v", newValue) + nodemetrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(newValue, resource.DecimalSI) + metricsClientset.Tracker().Update(nodesgvr, nodemetrics, "") + err := collector.Collect(ctx) + if err != nil { + t.Fatalf("failed to capture metrics: %v", err) + } + err = usageClient.sync(nodes) + if err != nil { + t.Fatalf("failed to capture a snapshot: %v", err) + } + nodeUtilization := usageClient.nodeUtilization(nodeName) + t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue()) + if nodeUtilization[v1.ResourceCPU].MilliValue() != expectedValue { + t.Fatalf("cpu node usage expected to be %v, got %v instead", expectedValue, nodeUtilization[v1.ResourceCPU].MilliValue()) + } + pods := usageClient.pods(nodeName) + fmt.Printf("pods: %#v\n", pods) + if len(pods) != 2 { + t.Fatalf("expected 2 pods for node %v, got %v instead", nodeName, len(pods)) + } +} + +func TestActualUsageClient(t *testing.T) { + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil) + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil) + p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + nodes := []*v1.Node{n1, n2, n3} + + n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816) + n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816) + n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816) + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3) + metricsClientset := fakemetricsclient.NewSimpleClientset() + metricsClientset.Tracker().Create(nodesgvr, n1metrics, "") + metricsClientset.Tracker().Create(nodesgvr, n2metrics, "") + metricsClientset.Tracker().Create(nodesgvr, n3metrics, "") + + ctx := context.TODO() + + resourceNames := []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } + + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + collector := metricscollector.NewMetricsCollector(nodeLister, metricsClientset, labels.Everything()) + + usageClient := newActualUsageClient( + resourceNames, + podsAssignedToNode, + collector, + ) + + updateMetricsAndCheckNodeUtilization(t, ctx, + 1400, 1400, + metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, + ) + + updateMetricsAndCheckNodeUtilization(t, ctx, + 500, 1310, + metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, + ) + + updateMetricsAndCheckNodeUtilization(t, ctx, + 900, 1269, + metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, + ) +} diff --git a/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go b/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go index 927c9089f..ca108ef61 100644 --- a/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go +++ b/pkg/framework/plugins/nodeutilization/zz_generated.deepcopy.go @@ -37,6 +37,7 @@ func (in *HighNodeUtilizationArgs) DeepCopyInto(out *HighNodeUtilizationArgs) { (*out)[key] = val } } + out.MetricsUtilization = in.MetricsUtilization if in.EvictableNamespaces != nil { in, out := &in.EvictableNamespaces, &out.EvictableNamespaces *out = new(api.Namespaces) @@ -81,6 +82,7 @@ func (in *LowNodeUtilizationArgs) DeepCopyInto(out *LowNodeUtilizationArgs) { (*out)[key] = val } } + out.MetricsUtilization = in.MetricsUtilization if in.EvictableNamespaces != nil { in, out := &in.EvictableNamespaces, &out.EvictableNamespaces *out = new(api.Namespaces) diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index 71234c3e6..a63613fda 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -20,21 +20,22 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "sigs.k8s.io/descheduler/metrics" - "sigs.k8s.io/descheduler/pkg/api" - "sigs.k8s.io/descheduler/pkg/descheduler/evictions" - podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" - "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" - frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" - "sigs.k8s.io/descheduler/pkg/tracing" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" + + "sigs.k8s.io/descheduler/metrics" + "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" + frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + "sigs.k8s.io/descheduler/pkg/tracing" ) // evictorImpl implements the Evictor interface so plugins @@ -67,6 +68,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev // handleImpl implements the framework handle which gets passed to plugins type handleImpl struct { clientSet clientset.Interface + metricsCollector *metricscollector.MetricsCollector getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc sharedInformerFactory informers.SharedInformerFactory evictor *evictorImpl @@ -79,6 +81,10 @@ func (hi *handleImpl) ClientSet() clientset.Interface { return hi.clientSet } +func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector { + return hi.metricsCollector +} + // GetPodsAssignedToNodeFunc retrieves GetPodsAssignedToNodeFunc implementation func (hi *handleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc { return hi.getPodsAssignedToNodeFunc @@ -128,6 +134,7 @@ type handleImplOpts struct { sharedInformerFactory informers.SharedInformerFactory getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc podEvictor *evictions.PodEvictor + metricsCollector *metricscollector.MetricsCollector } // WithClientSet sets clientSet for the scheduling frameworkImpl. @@ -155,6 +162,12 @@ func WithGetPodsAssignedToNodeFnc(getPodsAssignedToNodeFunc podutil.GetPodsAssig } } +func WithMetricsCollector(metricsCollector *metricscollector.MetricsCollector) Option { + return func(o *handleImplOpts) { + o.metricsCollector = metricsCollector + } +} + func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) { for idx, pluginConfig := range pluginConfigs { if pluginConfig.Name == pluginName { @@ -253,6 +266,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts profileName: config.Name, podEvictor: hOpts.podEvictor, }, + metricsCollector: hOpts.metricsCollector, } pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...) diff --git a/pkg/framework/types/types.go b/pkg/framework/types/types.go index 1d5e1d95c..6cb95ca24 100644 --- a/pkg/framework/types/types.go +++ b/pkg/framework/types/types.go @@ -24,6 +24,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" ) @@ -36,6 +37,7 @@ type Handle interface { Evictor() Evictor GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc SharedInformerFactory() informers.SharedInformerFactory + MetricsCollector() *metricscollector.MetricsCollector } // Evictor defines an interface for filtering and evicting pods diff --git a/test/e2e/e2e_lownodeutilization_test.go b/test/e2e/e2e_lownodeutilization_test.go new file mode 100644 index 000000000..125c5497f --- /dev/null +++ b/test/e2e/e2e_lownodeutilization_test.go @@ -0,0 +1,304 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "os" + "strings" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + componentbaseconfig "k8s.io/component-base/config" + utilptr "k8s.io/utils/ptr" + + "sigs.k8s.io/descheduler/pkg/api" + apiv1alpha2 "sigs.k8s.io/descheduler/pkg/api/v1alpha2" + "sigs.k8s.io/descheduler/pkg/descheduler/client" + "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + "sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization" +) + +func lowNodeUtilizationPolicy(lowNodeUtilizationArgs *nodeutilization.LowNodeUtilizationArgs, evictorArgs *defaultevictor.DefaultEvictorArgs, metricsCollectorEnabled bool) *apiv1alpha2.DeschedulerPolicy { + return &apiv1alpha2.DeschedulerPolicy{ + MetricsCollector: apiv1alpha2.MetricsCollector{ + Enabled: metricsCollectorEnabled, + }, + Profiles: []apiv1alpha2.DeschedulerProfile{ + { + Name: nodeutilization.LowNodeUtilizationPluginName + "Profile", + PluginConfigs: []apiv1alpha2.PluginConfig{ + { + Name: nodeutilization.LowNodeUtilizationPluginName, + Args: runtime.RawExtension{ + Object: lowNodeUtilizationArgs, + }, + }, + { + Name: defaultevictor.PluginName, + Args: runtime.RawExtension{ + Object: evictorArgs, + }, + }, + }, + Plugins: apiv1alpha2.Plugins{ + Filter: apiv1alpha2.PluginSet{ + Enabled: []string{ + defaultevictor.PluginName, + }, + }, + Balance: apiv1alpha2.PluginSet{ + Enabled: []string{ + nodeutilization.LowNodeUtilizationPluginName, + }, + }, + }, + }, + }, + } +} + +func TestLowNodeUtilizationKubernetesMetrics(t *testing.T) { + ctx := context.Background() + + clientSet, err := client.CreateClient(componentbaseconfig.ClientConnectionConfiguration{Kubeconfig: os.Getenv("KUBECONFIG")}, "") + if err != nil { + t.Errorf("Error during kubernetes client creation with %v", err) + } + + metricsClient, err := client.CreateMetricsClient(componentbaseconfig.ClientConnectionConfiguration{Kubeconfig: os.Getenv("KUBECONFIG")}, "descheduler") + if err != nil { + t.Errorf("Error during kubernetes metrics client creation with %v", err) + } + + nodeList, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("Error listing node with %v", err) + } + + _, workerNodes := splitNodesAndWorkerNodes(nodeList.Items) + + testNamespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "e2e-" + strings.ToLower(t.Name())}} + t.Logf("Creating testing namespace %q", testNamespace.Name) + if _, err := clientSet.CoreV1().Namespaces().Create(ctx, testNamespace, metav1.CreateOptions{}); err != nil { + t.Fatalf("Unable to create ns %v: %v", testNamespace.Name, err) + } + defer clientSet.CoreV1().Namespaces().Delete(ctx, testNamespace.Name, metav1.DeleteOptions{}) + + t.Log("Creating duplicates pods") + testLabel := map[string]string{"app": "test-lownodeutilization-kubernetes-metrics", "name": "test-lownodeutilization-kubernetes-metrics"} + deploymentObj := buildTestDeployment("lownodeutilization-kubernetes-metrics-pod", testNamespace.Name, 0, testLabel, nil) + deploymentObj.Spec.Template.Spec.Containers[0].Image = "narmidm/k8s-pod-cpu-stressor:latest" + deploymentObj.Spec.Template.Spec.Containers[0].Args = []string{"-cpu=3", "-duration=10s", "-forever"} + deploymentObj.Spec.Template.Spec.Containers[0].Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3000m"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0m"), + }, + } + + tests := []struct { + name string + replicasNum int + beforeFunc func(deployment *appsv1.Deployment) + expectedEvictedPodCount int + lowNodeUtilizationArgs *nodeutilization.LowNodeUtilizationArgs + evictorArgs *defaultevictor.DefaultEvictorArgs + metricsCollectorEnabled bool + }{ + { + name: "metric server not enabled", + replicasNum: 4, + beforeFunc: func(deployment *appsv1.Deployment) { + deployment.Spec.Replicas = utilptr.To[int32](4) + deployment.Spec.Template.Spec.NodeName = workerNodes[0].Name + }, + expectedEvictedPodCount: 0, + lowNodeUtilizationArgs: &nodeutilization.LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + TargetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + MetricsUtilization: nodeutilization.MetricsUtilization{ + MetricsServer: true, + }, + }, + evictorArgs: &defaultevictor.DefaultEvictorArgs{}, + metricsCollectorEnabled: false, + }, + { + name: "requested cpu resource zero, actual cpu utilization 3 per pod", + replicasNum: 4, + beforeFunc: func(deployment *appsv1.Deployment) { + deployment.Spec.Replicas = utilptr.To[int32](4) + deployment.Spec.Template.Spec.NodeName = workerNodes[0].Name + }, + expectedEvictedPodCount: 2, + lowNodeUtilizationArgs: &nodeutilization.LowNodeUtilizationArgs{ + Thresholds: api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + TargetThresholds: api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + MetricsUtilization: nodeutilization.MetricsUtilization{ + MetricsServer: true, + }, + }, + evictorArgs: &defaultevictor.DefaultEvictorArgs{}, + metricsCollectorEnabled: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Creating deployment %v in %v namespace", deploymentObj.Name, deploymentObj.Namespace) + tc.beforeFunc(deploymentObj) + + _, err = clientSet.AppsV1().Deployments(deploymentObj.Namespace).Create(ctx, deploymentObj, metav1.CreateOptions{}) + if err != nil { + t.Logf("Error creating deployment: %v", err) + if err = clientSet.AppsV1().Deployments(deploymentObj.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(deploymentObj.Labels).String(), + }); err != nil { + t.Fatalf("Unable to delete deployment: %v", err) + } + return + } + defer func() { + clientSet.AppsV1().Deployments(deploymentObj.Namespace).Delete(ctx, deploymentObj.Name, metav1.DeleteOptions{}) + waitForPodsToDisappear(ctx, t, clientSet, deploymentObj.Labels, deploymentObj.Namespace) + }() + waitForPodsRunning(ctx, t, clientSet, deploymentObj.Labels, tc.replicasNum, deploymentObj.Namespace) + // wait until workerNodes[0].Name has the right actual cpu utilization and all the testing pods are running + // and producing ~12 cores in total + wait.PollUntilWithContext(ctx, 5*time.Second, func(context.Context) (done bool, err error) { + item, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, workerNodes[0].Name, metav1.GetOptions{}) + t.Logf("Waiting for %q nodemetrics cpu utilization to get over 12, currently %v", workerNodes[0].Name, item.Usage.Cpu().Value()) + if item.Usage.Cpu().Value() < 12 { + return false, nil + } + totalCpu := resource.NewMilliQuantity(0, resource.DecimalSI) + podItems, err := metricsClient.MetricsV1beta1().PodMetricses(deploymentObj.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Logf("unable to list podmetricses: %v", err) + return false, nil + } + for _, podMetrics := range podItems.Items { + for _, container := range podMetrics.Containers { + if _, exists := container.Usage[v1.ResourceCPU]; !exists { + continue + } + totalCpu.Add(container.Usage[v1.ResourceCPU]) + } + } + // Value() will round up (e.g. 11.1 -> 12), which is still ok + t.Logf("Waiting for totalCpu to get to 12 at least, got %v\n", totalCpu.Value()) + return totalCpu.Value() >= 12, nil + }) + + preRunNames := sets.NewString(getCurrentPodNames(ctx, clientSet, testNamespace.Name, t)...) + + // Deploy the descheduler with the configured policy + deschedulerPolicyConfigMapObj, err := deschedulerPolicyConfigMap(lowNodeUtilizationPolicy(tc.lowNodeUtilizationArgs, tc.evictorArgs, tc.metricsCollectorEnabled)) + if err != nil { + t.Fatalf("Error creating %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + + t.Logf("Creating %q policy CM with LowNodeUtilization configured...", deschedulerPolicyConfigMapObj.Name) + _, err = clientSet.CoreV1().ConfigMaps(deschedulerPolicyConfigMapObj.Namespace).Create(ctx, deschedulerPolicyConfigMapObj, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + + defer func() { + t.Logf("Deleting %q CM...", deschedulerPolicyConfigMapObj.Name) + err = clientSet.CoreV1().ConfigMaps(deschedulerPolicyConfigMapObj.Namespace).Delete(ctx, deschedulerPolicyConfigMapObj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Unable to delete %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + }() + + deschedulerDeploymentObj := deschedulerDeployment(testNamespace.Name) + t.Logf("Creating descheduler deployment %v", deschedulerDeploymentObj.Name) + _, err = clientSet.AppsV1().Deployments(deschedulerDeploymentObj.Namespace).Create(ctx, deschedulerDeploymentObj, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating %q deployment: %v", deschedulerDeploymentObj.Name, err) + } + + deschedulerPodName := "" + defer func() { + if deschedulerPodName != "" { + printPodLogs(ctx, t, clientSet, deschedulerPodName) + } + + t.Logf("Deleting %q deployment...", deschedulerDeploymentObj.Name) + err = clientSet.AppsV1().Deployments(deschedulerDeploymentObj.Namespace).Delete(ctx, deschedulerDeploymentObj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Unable to delete %q deployment: %v", deschedulerDeploymentObj.Name, err) + } + + waitForPodsToDisappear(ctx, t, clientSet, deschedulerDeploymentObj.Labels, deschedulerDeploymentObj.Namespace) + }() + + t.Logf("Waiting for the descheduler pod running") + deschedulerPods := waitForPodsRunning(ctx, t, clientSet, deschedulerDeploymentObj.Labels, 1, deschedulerDeploymentObj.Namespace) + if len(deschedulerPods) != 0 { + deschedulerPodName = deschedulerPods[0].Name + } + + // Run LowNodeUtilization plugin + var meetsExpectations bool + var actualEvictedPodCount int + if err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + currentRunNames := sets.NewString(getCurrentPodNames(ctx, clientSet, testNamespace.Name, t)...) + actualEvictedPod := preRunNames.Difference(currentRunNames) + actualEvictedPodCount = actualEvictedPod.Len() + t.Logf("preRunNames: %v, currentRunNames: %v, actualEvictedPodCount: %v\n", preRunNames.List(), currentRunNames.List(), actualEvictedPodCount) + if actualEvictedPodCount != tc.expectedEvictedPodCount { + t.Logf("Expecting %v number of pods evicted, got %v instead", tc.expectedEvictedPodCount, actualEvictedPodCount) + return false, nil + } + meetsExpectations = true + return true, nil + }); err != nil { + t.Errorf("Error waiting for descheduler running: %v", err) + } + + if !meetsExpectations { + t.Errorf("Unexpected number of pods have been evicted, got %v, expected %v", actualEvictedPodCount, tc.expectedEvictedPodCount) + } else { + t.Logf("Total of %d Pods were evicted for %s", actualEvictedPodCount, tc.name) + } + }) + } +} diff --git a/test/run-e2e-tests.sh b/test/run-e2e-tests.sh index 65ebe1527..39cfdd4ca 100755 --- a/test/run-e2e-tests.sh +++ b/test/run-e2e-tests.sh @@ -99,5 +99,10 @@ if [ -z "${SKIP_KUBEVIRT_INSTALL}" ]; then kubectl -n kubevirt patch kubevirt kubevirt --type=merge --patch '{"spec":{"configuration":{"developerConfiguration":{"useEmulation":true}}}}' fi +METRICS_SERVER_VERSION="v0.5.0" +kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/${METRICS_SERVER_VERSION}/components.yaml +kubectl patch -n kube-system deployment metrics-server --type=json \ + -p '[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--kubelet-insecure-tls"}]' + PRJ_PREFIX="sigs.k8s.io/descheduler" go test ${PRJ_PREFIX}/test/e2e/ -v -timeout 0 diff --git a/test/test_utils.go b/test/test_utils.go index 21c85a07b..5a3169dd1 100644 --- a/test/test_utils.go +++ b/test/test_utils.go @@ -23,17 +23,17 @@ import ( "testing" "time" - policyv1 "k8s.io/api/policy/v1" - "k8s.io/apimachinery/pkg/util/intstr" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" utilptr "k8s.io/utils/ptr" ) @@ -89,6 +89,26 @@ func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget { return pdb } +// BuildPodMetrics creates a test podmetrics with given parameters. +func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics { + return &v1beta1.PodMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Window: metav1.Duration{Duration: 20010000000}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container-1", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI), + }, + }, + }, + } +} + // GetMirrorPodAnnotation returns the annotation needed for mirror pod. func GetMirrorPodAnnotation() map[string]string { return map[string]string{ @@ -157,6 +177,19 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) return node } +func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics { + return &v1beta1.NodeMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Window: metav1.Duration{Duration: 20010000000}, + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI), + }, + } +} + // MakeBestEffortPod makes the given pod a BestEffort pod func MakeBestEffortPod(pod *v1.Pod) { pod.Spec.Containers[0].Resources.Requests = nil