From b56794708d2192125e9a2c695d9b87dc88ceea6d Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Tue, 5 Nov 2024 21:13:27 +0100 Subject: [PATCH] descheduler: wire the metrics collector with the framework handle --- cmd/descheduler/app/options/options.go | 3 + pkg/api/types.go | 10 +++ pkg/api/v1alpha2/types.go | 10 +++ pkg/descheduler/client/client.go | 22 ++++- pkg/descheduler/descheduler.go | 60 ++++++++----- pkg/descheduler/descheduler_test.go | 117 +++++++++++++++++++++++++ pkg/framework/profile/profile.go | 8 ++ 7 files changed, 209 insertions(+), 21 deletions(-) diff --git a/cmd/descheduler/app/options/options.go b/cmd/descheduler/app/options/options.go index 384afbda2..e8831b8e6 100644 --- a/cmd/descheduler/app/options/options.go +++ b/cmd/descheduler/app/options/options.go @@ -26,6 +26,8 @@ import ( clientset "k8s.io/client-go/kubernetes" componentbaseconfig "k8s.io/component-base/config" componentbaseoptions "k8s.io/component-base/config/options" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" + "sigs.k8s.io/descheduler/pkg/apis/componentconfig" "sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1" deschedulerscheme "sigs.k8s.io/descheduler/pkg/descheduler/scheme" @@ -42,6 +44,7 @@ type DeschedulerServer struct { Client clientset.Interface EventClient clientset.Interface + MetricsClient metricsclient.Interface SecureServing *apiserveroptions.SecureServingOptionsWithLoopback DisableMetrics bool EnableHTTP2 bool diff --git a/pkg/api/types.go b/pkg/api/types.go index 6917e6cdb..f282d0fe3 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -41,6 +41,9 @@ type DeschedulerPolicy struct { // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. MaxNoOfPodsToEvictTotal *uint + + // MetricsCollector configures collection of metrics about actual resource utilization + MetricsCollector MetricsCollector } // Namespaces carries a list of included/excluded namespaces @@ -84,3 +87,10 @@ type PluginSet struct { Enabled []string Disabled []string } + +// MetricsCollector configures collection of metrics about actual resource utilization +type MetricsCollector struct { + // Enabled metrics collection from kubernetes metrics. + // Later, the collection can be extended to other providers. + Enabled bool +} diff --git a/pkg/api/v1alpha2/types.go b/pkg/api/v1alpha2/types.go index d663efb86..9a88af746 100644 --- a/pkg/api/v1alpha2/types.go +++ b/pkg/api/v1alpha2/types.go @@ -40,6 +40,9 @@ type DeschedulerPolicy struct { // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. MaxNoOfPodsToEvictTotal *uint `json:"maxNoOfPodsToEvictTotal,omitempty"` + + // MetricsCollector configures collection of metrics about actual resource utilization + MetricsCollector MetricsCollector `json:"metricsCollector,omitempty"` } type DeschedulerProfile struct { @@ -66,3 +69,10 @@ type PluginSet struct { Enabled []string `json:"enabled"` Disabled []string `json:"disabled"` } + +// MetricsCollector configures collection of metrics about actual resource utilization +type MetricsCollector struct { + // Enabled metrics collection from kubernetes metrics. + // Later, the collection can be extended to other providers. + Enabled bool +} diff --git a/pkg/descheduler/client/client.go b/pkg/descheduler/client/client.go index 964ed3b9b..26b55ca9c 100644 --- a/pkg/descheduler/client/client.go +++ b/pkg/descheduler/client/client.go @@ -21,6 +21,7 @@ import ( clientset "k8s.io/client-go/kubernetes" componentbaseconfig "k8s.io/component-base/config" + metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" // Ensure to load all auth plugins. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -28,7 +29,7 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) { +func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) { var cfg *rest.Config if len(clientConnection.Kubeconfig) != 0 { master, err := GetMasterFromKubeconfig(clientConnection.Kubeconfig) @@ -56,9 +57,28 @@ func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfigura cfg = rest.AddUserAgent(cfg, userAgt) } + return cfg, nil +} + +func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) { + cfg, err := createConfig(clientConnection, userAgt) + if err != nil { + return nil, fmt.Errorf("unable to create config: %v", err) + } + return clientset.NewForConfig(cfg) } +func CreateMetricsClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (metricsclient.Interface, error) { + cfg, err := createConfig(clientConnection, userAgt) + if err != nil { + return nil, fmt.Errorf("unable to create config: %v", err) + } + + // Create the metrics clientset to access the metrics.k8s.io API + return metricsclient.NewForConfig(cfg) +} + func GetMasterFromKubeconfig(filename string) (string, error) { config, err := clientcmd.LoadFromFile(filename) if err != nil { diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index b50757e1b..76b2bee06 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -23,43 +23,41 @@ import ( "strconv" "time" - schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" + schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + fakeclientset "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - utilversion "k8s.io/apimachinery/pkg/util/version" - "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - fakeclientset "k8s.io/client-go/kubernetes/fake" - core "k8s.io/client-go/testing" - - "sigs.k8s.io/descheduler/pkg/descheduler/client" - eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" - nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" - "sigs.k8s.io/descheduler/pkg/tracing" - "sigs.k8s.io/descheduler/pkg/utils" - "sigs.k8s.io/descheduler/pkg/version" - "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/metrics" "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/pkg/descheduler/client" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" + nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + "sigs.k8s.io/descheduler/pkg/tracing" + "sigs.k8s.io/descheduler/pkg/utils" + "sigs.k8s.io/descheduler/pkg/version" ) type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status @@ -78,6 +76,7 @@ type descheduler struct { eventRecorder events.EventRecorder podEvictor *evictions.PodEvictor podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) + metricsCollector *metricscollector.MetricsCollector } type informerResources struct { @@ -152,6 +151,11 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche WithMetricsEnabled(!rs.DisableMetrics), ) + var metricsCollector *metricscollector.MetricsCollector + if deschedulerPolicy.MetricsCollector.Enabled { + metricsCollector = metricscollector.NewMetricsCollector(rs.Client, rs.MetricsClient) + } + return &descheduler{ rs: rs, ir: ir, @@ -161,6 +165,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche eventRecorder: eventRecorder, podEvictor: podEvictor, podEvictionReactionFnc: podEvictionReactionFnc, + metricsCollector: metricsCollector, }, nil } @@ -240,6 +245,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), + frameworkprofile.WithMetricsCollector(d.metricsCollector), ) if err != nil { klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) @@ -304,6 +310,14 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error { return err } + if deschedulerPolicy.MetricsCollector.Enabled { + metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler") + if err != nil { + return err + } + rs.MetricsClient = metricsClient + } + runFn := func() error { return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion) } @@ -411,6 +425,12 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) + go func() { + klog.V(2).Infof("Starting metrics collector") + descheduler.metricsCollector.Run(ctx) + klog.V(2).Infof("Stopped metrics collector") + }() + wait.NonSlidingUntil(func() { // A next context is created here intentionally to avoid nesting the spans via context. sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil") diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 76bbf2e96..445eb8023 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -16,11 +16,16 @@ import ( fakeclientset "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/klog/v2" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake" utilptr "k8s.io/utils/ptr" + "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + "sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization" "sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates" "sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints" "sigs.k8s.io/descheduler/pkg/utils" @@ -33,6 +38,7 @@ func initPluginRegistry() { pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry) pluginregistry.Register(defaultevictor.PluginName, defaultevictor.New, &defaultevictor.DefaultEvictor{}, &defaultevictor.DefaultEvictorArgs{}, defaultevictor.ValidateDefaultEvictorArgs, defaultevictor.SetDefaults_DefaultEvictorArgs, pluginregistry.PluginRegistry) pluginregistry.Register(removepodsviolatingnodetaints.PluginName, removepodsviolatingnodetaints.New, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaints{}, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaintsArgs{}, removepodsviolatingnodetaints.ValidateRemovePodsViolatingNodeTaintsArgs, removepodsviolatingnodetaints.SetDefaults_RemovePodsViolatingNodeTaintsArgs, pluginregistry.PluginRegistry) + pluginregistry.Register(nodeutilization.LowNodeUtilizationPluginName, nodeutilization.NewLowNodeUtilization, &nodeutilization.LowNodeUtilization{}, &nodeutilization.LowNodeUtilizationArgs{}, nodeutilization.ValidateLowNodeUtilizationArgs, nodeutilization.SetDefaults_LowNodeUtilizationArgs, pluginregistry.PluginRegistry) } func removePodsViolatingNodeTaintsPolicy() *api.DeschedulerPolicy { @@ -99,6 +105,44 @@ func removeDuplicatesPolicy() *api.DeschedulerPolicy { } } +func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThresholds, metricsEnabled bool) *api.DeschedulerPolicy { + return &api.DeschedulerPolicy{ + Profiles: []api.DeschedulerProfile{ + { + Name: "Profile", + PluginConfigs: []api.PluginConfig{ + { + Name: nodeutilization.LowNodeUtilizationPluginName, + Args: &nodeutilization.LowNodeUtilizationArgs{ + Thresholds: thresholds, + TargetThresholds: targetThresholds, + MetricsUtilization: nodeutilization.MetricsUtilization{ + MetricsServer: metricsEnabled, + }, + }, + }, + { + Name: defaultevictor.PluginName, + Args: &defaultevictor.DefaultEvictorArgs{}, + }, + }, + Plugins: api.Plugins{ + Filter: api.PluginSet{ + Enabled: []string{ + defaultevictor.PluginName, + }, + }, + Balance: api.PluginSet{ + Enabled: []string{ + nodeutilization.LowNodeUtilizationPluginName, + }, + }, + }, + }, + }, + } +} + func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { client := fakeclientset.NewSimpleClientset(objects...) eventClient := fakeclientset.NewSimpleClientset(objects...) @@ -539,3 +583,76 @@ func TestDeschedulingLimits(t *testing.T) { }) } } + +func TestLoadAwareDescheduling(t *testing.T) { + initPluginRegistry() + + ownerRef1 := test.GetReplicaSetOwnerRefList() + updatePod := func(pod *v1.Pod) { + pod.ObjectMeta.OwnerReferences = ownerRef1 + } + + ctx := context.Background() + node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule) + node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + nodes := []*v1.Node{node1, node2} + + p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod) + p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod) + p3 := test.BuildTestPod("p3", 300, 0, node1.Name, updatePod) + p4 := test.BuildTestPod("p4", 300, 0, node1.Name, updatePod) + p5 := test.BuildTestPod("p5", 300, 0, node1.Name, updatePod) + + ctxCancel, cancel := context.WithCancel(ctx) + _, descheduler, client := initDescheduler( + t, + ctxCancel, + lowNodeUtilizationPolicy( + api.ResourceThresholds{ + v1.ResourceCPU: 30, + v1.ResourcePods: 30, + }, + api.ResourceThresholds{ + v1.ResourceCPU: 50, + v1.ResourcePods: 50, + }, + true, // enabled metrics utilization + ), + node1, node2, p1, p2, p3, p4, p5) + defer cancel() + + nodemetricses := []*v1beta1.NodeMetrics{ + test.BuildNodeMetrics("n1", 2400, 3000), + test.BuildNodeMetrics("n2", 400, 0), + } + + podmetricses := []*v1beta1.PodMetrics{ + test.BuildPodMetrics("p1", 400, 0), + test.BuildPodMetrics("p2", 400, 0), + test.BuildPodMetrics("p3", 400, 0), + test.BuildPodMetrics("p4", 400, 0), + test.BuildPodMetrics("p5", 400, 0), + } + + var metricsObjs []runtime.Object + for _, nodemetrics := range nodemetricses { + metricsObjs = append(metricsObjs, nodemetrics) + } + for _, podmetrics := range podmetricses { + metricsObjs = append(metricsObjs, podmetrics) + } + + metricsClientset := fakemetricsclient.NewSimpleClientset(metricsObjs...) + descheduler.metricsCollector = metricscollector.NewMetricsCollector(client, metricsClientset) + descheduler.metricsCollector.Collect(ctx) + + err := descheduler.runDeschedulerLoop(ctx, nodes) + if err != nil { + t.Fatalf("Unable to run a descheduling loop: %v", err) + } + totalEs := descheduler.podEvictor.TotalEvicted() + if totalEs != 2 { + t.Fatalf("Expected %v evictions in total, got %v instead", 2, totalEs) + } + t.Logf("Total evictions: %v", totalEs) +} diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index 044de1198..7faab22bd 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -134,6 +134,7 @@ type handleImplOpts struct { sharedInformerFactory informers.SharedInformerFactory getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc podEvictor *evictions.PodEvictor + metricsCollector *metricscollector.MetricsCollector } // WithClientSet sets clientSet for the scheduling frameworkImpl. @@ -161,6 +162,12 @@ func WithGetPodsAssignedToNodeFnc(getPodsAssignedToNodeFunc podutil.GetPodsAssig } } +func WithMetricsCollector(metricsCollector *metricscollector.MetricsCollector) Option { + return func(o *handleImplOpts) { + o.metricsCollector = metricsCollector + } +} + func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) { for idx, pluginConfig := range pluginConfigs { if pluginConfig.Name == pluginName { @@ -259,6 +266,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts profileName: config.Name, podEvictor: hOpts.podEvictor, }, + metricsCollector: hOpts.metricsCollector, } pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...)