From 22d9230a67e50e16fba625a55285f2a3166de3b2 Mon Sep 17 00:00:00 2001 From: Simon Scharf Date: Fri, 4 Oct 2024 13:20:28 +0200 Subject: [PATCH] Make sure dry runs sees all the resources a normal run would do (#1526) * generic resource handling, so that dry run has all the expected resource types and objects * simpler code and better names * fix imports --- pkg/descheduler/descheduler.go | 136 +++++++++++++++------------------ 1 file changed, 62 insertions(+), 74 deletions(-) diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 9519fa43c..b50757e1b 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -23,6 +23,9 @@ import ( "strconv" "time" + schedulingv1 "k8s.io/api/scheduling/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/api/meta" @@ -34,15 +37,12 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" - listersv1 "k8s.io/client-go/listers/core/v1" - schedulingv1 "k8s.io/client-go/listers/scheduling/v1" core "k8s.io/client-go/testing" "sigs.k8s.io/descheduler/pkg/descheduler/client" @@ -71,10 +71,7 @@ type profileRunner struct { type descheduler struct { rs *options.DeschedulerServer - podLister listersv1.PodLister - nodeLister listersv1.NodeLister - namespaceLister listersv1.NamespaceLister - priorityClassLister schedulingv1.PriorityClassLister + ir *informerResources getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc sharedInformerFactory informers.SharedInformerFactory deschedulerPolicy *api.DeschedulerPolicy @@ -83,12 +80,60 @@ type descheduler struct { podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) } +type informerResources struct { + sharedInformerFactory informers.SharedInformerFactory + resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer +} + +func newInformerResources(sharedInformerFactory informers.SharedInformerFactory) *informerResources { + return &informerResources{ + sharedInformerFactory: sharedInformerFactory, + resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer), + } +} + +func (ir *informerResources) Uses(resources ...schema.GroupVersionResource) error { + for _, resource := range resources { + informer, err := ir.sharedInformerFactory.ForResource(resource) + if err != nil { + return err + } + + ir.resourceToInformer[resource] = informer + } + return nil +} + +// CopyTo Copy informer subscriptions to the new factory and objects to the fake client so that the backing caches are populated for when listers are used. +func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFactory informers.SharedInformerFactory) error { + for resource, informer := range ir.resourceToInformer { + _, err := newFactory.ForResource(resource) + if err != nil { + return fmt.Errorf("error getting resource %s: %w", resource, err) + } + + objects, err := informer.Lister().List(labels.Everything()) + if err != nil { + return fmt.Errorf("error listing %s: %w", informer, err) + } + + for _, object := range objects { + fakeClient.Tracker().Add(object) + } + } + return nil +} + func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { podInformer := sharedInformerFactory.Core().V1().Pods().Informer() - podLister := sharedInformerFactory.Core().V1().Pods().Lister() - nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() - namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister() - priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister() + + ir := newInformerResources(sharedInformerFactory) + ir.Uses(v1.SchemeGroupVersion.WithResource("pods"), + v1.SchemeGroupVersion.WithResource("nodes"), + // Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay + // consistent with the real runs without having to keep the list here in sync. + v1.SchemeGroupVersion.WithResource("namespaces"), // Used by the defaultevictor plugin + schedulingv1.SchemeGroupVersion.WithResource("priorityclasses")) // Used by the defaultevictor plugin getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) if err != nil { @@ -109,10 +154,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche return &descheduler{ rs: rs, - podLister: podLister, - nodeLister: nodeLister, - namespaceLister: namespaceLister, - priorityClassLister: priorityClassLister, + ir: ir, getPodsAssignedToNode: getPodsAssignedToNode, sharedInformerFactory: sharedInformerFactory, deschedulerPolicy: deschedulerPolicy, @@ -146,13 +188,14 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) fakeClient := fakeclientset.NewSimpleClientset() // simulate a pod eviction by deleting a pod fakeClient.PrependReactor("create", "pods", d.podEvictionReactionFnc(fakeClient)) - err := cachedClient(d.rs.Client, fakeClient, d.podLister, d.nodeLister, d.namespaceLister, d.priorityClassLister) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + + err := d.ir.CopyTo(fakeClient, fakeSharedInformerFactory) if err != nil { return err } // create a new instance of the shared informer factor from the cached client - fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) // register the pod informer, otherwise it will not get running d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer()) if err != nil { @@ -336,62 +379,6 @@ func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset) func(action cor } } -func cachedClient( - realClient clientset.Interface, - fakeClient *fakeclientset.Clientset, - podLister listersv1.PodLister, - nodeLister listersv1.NodeLister, - namespaceLister listersv1.NamespaceLister, - priorityClassLister schedulingv1.PriorityClassLister, -) error { - klog.V(3).Infof("Pulling resources for the cached client from the cluster") - pods, err := podLister.List(labels.Everything()) - if err != nil { - return fmt.Errorf("unable to list pods: %v", err) - } - - for _, item := range pods { - if _, err := fakeClient.CoreV1().Pods(item.Namespace).Create(context.TODO(), item, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("unable to copy pod: %v", err) - } - } - - nodes, err := nodeLister.List(labels.Everything()) - if err != nil { - return fmt.Errorf("unable to list nodes: %v", err) - } - - for _, item := range nodes { - if _, err := fakeClient.CoreV1().Nodes().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("unable to copy node: %v", err) - } - } - - namespaces, err := namespaceLister.List(labels.Everything()) - if err != nil { - return fmt.Errorf("unable to list namespaces: %v", err) - } - - for _, item := range namespaces { - if _, err := fakeClient.CoreV1().Namespaces().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("unable to copy namespace: %v", err) - } - } - - priorityClasses, err := priorityClassLister.List(labels.Everything()) - if err != nil { - return fmt.Errorf("unable to list priorityclasses: %v", err) - } - - for _, item := range priorityClasses { - if _, err := fakeClient.SchedulingV1().PriorityClasses().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("unable to copy priorityclass: %v", err) - } - } - - return nil -} - func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error { var span trace.Span ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies") @@ -428,7 +415,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer // A next context is created here intentionally to avoid nesting the spans via context. sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil") defer sSpan.End() - nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.nodeLister, nodeSelector) + + nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.sharedInformerFactory.Core().V1().Nodes().Lister(), nodeSelector) if err != nil { sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error()))) klog.Error(err)