From 770ec5affa7f99e541e78ee52d8e78bb792edad3 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Thu, 22 Jan 2026 17:21:13 +0100 Subject: [PATCH] refactor(pkg/descheduler): create fake shared informer factory only once --- pkg/descheduler/descheduler.go | 358 +++++++++++++++++++------ pkg/descheduler/descheduler_test.go | 247 ++++++++++++++--- pkg/descheduler/evictions/evictions.go | 6 - test/test_utils.go | 24 ++ 4 files changed, 504 insertions(+), 131 deletions(-) diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 0fba7fb0c..6f7095972 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -36,6 +36,7 @@ import ( schedulingv1 "k8s.io/api/scheduling/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -133,6 +134,7 @@ func (c *evictedPodsCache) clear() { type descheduler struct { rs *options.DeschedulerServer + client clientset.Interface kubeClientSandbox *kubeClientSandbox getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc sharedInformerFactory informers.SharedInformerFactory @@ -151,8 +153,6 @@ type descheduler struct { // kubeClientSandbox creates a sandbox environment with a fake client and informer factory // that mirrors resources from a real client, useful for dry-run testing scenarios type kubeClientSandbox struct { - client clientset.Interface - sharedInformerFactory informers.SharedInformerFactory fakeKubeClient *fakeclientset.Clientset fakeFactory informers.SharedInformerFactory resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer @@ -173,13 +173,16 @@ func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFacto func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) { sandbox := &kubeClientSandbox{ - client: client, - sharedInformerFactory: sharedInformerFactory, resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer), evictedPodsCache: newEvictedPodsCache(), podEvictionReactionFnc: podEvictionReactionFnc, } + sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset() + // simulate a pod eviction by deleting a pod + sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache)) + sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0) + for _, resource := range resources { informer, err := sharedInformerFactory.ForResource(resource) if err != nil { @@ -188,33 +191,78 @@ func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory info sandbox.resourceToInformer[resource] = informer } + // Register event handlers to sync changes from real client to fake client. + // These handlers will keep the fake client in sync with ongoing changes. + if err := sandbox.registerEventHandlers(); err != nil { + return nil, fmt.Errorf("error registering event handlers: %w", err) + } + return sandbox, nil } -func (sandbox *kubeClientSandbox) buildSandbox() error { - sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset() - // simulate a pod eviction by deleting a pod - sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache)) - sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0) - +func (sandbox *kubeClientSandbox) registerEventHandlers() error { for resource, informer := range sandbox.resourceToInformer { + // Create a local copy to avoid closure capture issue + resource := resource + _, err := sandbox.fakeFactory.ForResource(resource) if err != nil { - return fmt.Errorf("error getting resource %s: %w", resource, err) + return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err) } - objects, err := informer.Lister().List(labels.Everything()) + _, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + runtimeObj, ok := obj.(runtime.Object) + if !ok { + klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) + return + } + if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil { + if !apierrors.IsAlreadyExists(err) { + klog.ErrorS(err, "failed to add object to fake client", "resource", resource) + } + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + runtimeObj, ok := newObj.(runtime.Object) + if !ok { + klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) + return + } + metaObj, err := meta.Accessor(runtimeObj) + if err != nil { + klog.ErrorS(err, "failed to get object metadata", "resource", resource) + return + } + if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil { + klog.ErrorS(err, "failed to update object in fake client", "resource", resource) + } + }, + DeleteFunc: func(obj interface{}) { + // Handle tombstone case where the object might be wrapped + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + + runtimeObj, ok := obj.(runtime.Object) + if !ok { + klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) + return + } + metaObj, err := meta.Accessor(runtimeObj) + if err != nil { + klog.ErrorS(err, "failed to get object metadata", "resource", resource) + return + } + if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil { + klog.ErrorS(err, "failed to delete object from fake client", "resource", resource) + } + }, + }) if err != nil { - return fmt.Errorf("error listing %s: %w", informer, err) - } - - for _, object := range objects { - if err := sandbox.fakeKubeClient.Tracker().Add(object); err != nil { - return fmt.Errorf("error adding object to tracker: %w", err) - } + return fmt.Errorf("error adding event handler for resource %s: %w", resource, err) } } - return nil } @@ -230,6 +278,161 @@ func (sandbox *kubeClientSandbox) reset() { sandbox.evictedPodsCache.clear() } +// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource +func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name string) (bool, error) { + informer, err := sandbox.fakeFactory.ForResource(resource) + if err != nil { + return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err) + } + + key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String() + _, exists, err := informer.Informer().GetIndexer().GetByKey(key) + if err != nil { + return false, err + } + + return exists, nil +} + +// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type +func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) { + // Get metadata accessor to extract namespace and name + metaObj, err := meta.Accessor(obj) + if err != nil { + return false, fmt.Errorf("failed to get object metadata: %w", err) + } + + // Get the GVK from the object using TypeMeta + gvk := obj.GetObjectKind().GroupVersionKind() + if gvk.Empty() { + return false, fmt.Errorf("no GroupVersionKind found for object") + } + + // Use the GVK to construct the GVR by pluralizing the kind + plural, _ := meta.UnsafeGuessKindToResource(gvk) + gvr := schema.GroupVersionResource{ + Group: gvk.Group, + Version: gvk.Version, + Resource: plural.Resource, + } + + return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName()) +} + +func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error { + if len(pods) == 0 { + return nil + } + + err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + for _, pod := range pods { + satisfied, err := checkFn(pod) + if err != nil { + return false, err + } + if !satisfied { + return false, nil + } + } + return true, nil + }) + if err != nil { + return err + } + + klog.V(4).InfoS(successMsg) + return nil +} + +// restoreEvictedPods restores pods from the evicted pods cache back to the fake client +func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error { + podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")] + if !ok { + return fmt.Errorf("pod informer not found in resourceToInformer") + } + + evictedPods := sandbox.evictedPodsCache.list() + + // First wait loop: Check that all evicted pods are cleared from the indexers. + // This ensures the eviction has fully propagated through the fake informer's indexer. + if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) { + exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name) + if err != nil { + klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) + return false, nil + } + if exists { + klog.V(4).InfoS("Pod still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name) + return false, nil + } + klog.V(4).InfoS("Pod no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name) + return true, nil + }, "All evicted pods removed from fake indexer"); err != nil { + return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err) + } + + var restoredPods []*evictedPodInfo + for _, evictedPodInfo := range sandbox.evictedPodsCache.list() { + obj, err := podInformer.Lister().ByNamespace(evictedPodInfo.Namespace).Get(evictedPodInfo.Name) + if err != nil { + klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "error", err) + continue + } + + pod, ok := obj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Object is not a pod", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name) + continue + } + + if string(pod.UID) != evictedPodInfo.UID { + klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "expectedUID", evictedPodInfo.UID, "actualUID", string(pod.UID)) + continue + } + + if err := sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", evictedPodInfo.Namespace, evictedPodInfo.Name, err) + } + klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "uid", evictedPodInfo.UID) + restoredPods = append(restoredPods, evictedPodInfo) + } + + // Second wait loop: Make sure the evicted pods are added back to the fake client. + // This ensures the restored pods are accessible through the fake informer's lister. + if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) { + podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name) + if err != nil { + klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name) + return false, nil + } + klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName) + return true, nil + }, "All restored pods are accessible in fake informer"); err != nil { + return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err) + } + + // Third wait loop: Make sure the indexers see the added pods. + // This is important to ensure each descheduling cycle can see all the restored pods. + // Without this wait, the next cycle might not see the restored pods in the indexer yet. + if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) { + exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name) + if err != nil { + klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) + return false, nil + } + if !exists { + klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name) + return false, nil + } + klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name) + return true, nil + }, "All restored pods are now in fake indexer"); err != nil { + return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err) + } + + return nil +} + func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) { nodeSelector := labels.Everything() if deschedulerPolicy.NodeSelector != nil { @@ -246,30 +449,6 @@ func addNodeSelectorIndexer(sharedInformerFactory informers.SharedInformerFactor return nodeutil.AddNodeSelectorIndexer(sharedInformerFactory.Core().V1().Nodes().Informer(), indexerNodeSelectorGlobal, nodeSelector) } -func setupInformerIndexers(sharedInformerFactory informers.SharedInformerFactory, deschedulerPolicy *api.DeschedulerPolicy) (podutil.GetPodsAssignedToNodeFunc, error) { - // create a new instance of the shared informer factory from the cached client - // register the pod informer, otherwise it will not get running - getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(sharedInformerFactory.Core().V1().Pods().Informer()) - if err != nil { - return nil, fmt.Errorf("build get pods assigned to node function error: %v", err) - } - - // TODO(ingvagabund): copy paste all relevant indexers from the real client to the fake one - // TODO(ingvagabund): register one indexer per each profile. Respect the precedence of no profile-level node selector is specified. - // Also, keep a cache of node label selectors to detect duplicates to avoid creating an extra informer. - - nodeSelector, err := nodeSelectorFromPolicy(deschedulerPolicy) - if err != nil { - return nil, err - } - - if err := addNodeSelectorIndexer(sharedInformerFactory, nodeSelector); err != nil { - return nil, err - } - - return getPodsAssignedToNode, nil -} - func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider { providersMap := make(map[api.MetricsSource]*api.MetricsProvider) for _, provider := range providersList { @@ -295,15 +474,11 @@ func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory inf return nil } -func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { +func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, kubeClientSandbox *kubeClientSandbox) (*descheduler, error) { podInformer := sharedInformerFactory.Core().V1().Pods().Informer() - - // Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay - // consistent with the real runs without having to keep the list here in sync. - kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory) - if err != nil { - return nil, fmt.Errorf("failed to create kube client sandbox: %v", err) - } + // Temporarily register the PVC because it is used by the DefaultEvictor plugin during + // the descheduling cycle, where informer registration is ignored. + _ = sharedInformerFactory.Core().V1().PersistentVolumeClaims().Informer() getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) if err != nil { @@ -312,7 +487,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu podEvictor, err := evictions.NewPodEvictor( ctx, - rs.Client, + client, eventRecorder, podInformer, rs.DefaultFeatureGates, @@ -332,6 +507,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu desch := &descheduler{ rs: rs, + client: client, kubeClientSandbox: kubeClientSandbox, getPodsAssignedToNode: getPodsAssignedToNode, sharedInformerFactory: sharedInformerFactory, @@ -479,42 +655,20 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context) error { metrics.LoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds()) }(time.Now()) - var client clientset.Interface - // When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client. - // So when evicting pods while running multiple strategies in a row have the cummulative effect - // as is when evicting pods for real. - if d.rs.DryRun { - klog.V(3).Infof("Building a cached client from the cluster for the dry run") - // Create a new cache so we start from scratch without any leftovers - err := d.kubeClientSandbox.buildSandbox() - if err != nil { - return err - } - - getPodsAssignedToNode, err := setupInformerIndexers(d.kubeClientSandbox.fakeSharedInformerFactory(), d.deschedulerPolicy) - if err != nil { - return err - } - d.getPodsAssignedToNode = getPodsAssignedToNode - - fakeCtx, cncl := context.WithCancel(context.TODO()) - defer cncl() - d.kubeClientSandbox.fakeSharedInformerFactory().Start(fakeCtx.Done()) - d.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(fakeCtx.Done()) - - client = d.kubeClientSandbox.fakeClient() - d.sharedInformerFactory = d.kubeClientSandbox.fakeSharedInformerFactory() - } else { - client = d.rs.Client - } - - klog.V(3).Infof("Setting up the pod evictor") - d.podEvictor.SetClient(client) + klog.V(3).Infof("Resetting pod evictor counters") d.podEvictor.ResetCounters() - d.runProfiles(ctx, client) + d.runProfiles(ctx, d.client) if d.rs.DryRun { + if d.kubeClientSandbox == nil { + return fmt.Errorf("kubeClientSandbox is nil in DryRun mode") + } + klog.V(3).Infof("Restoring evicted pods from cache") + if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil { + klog.ErrorS(err, "Failed to restore evicted pods") + return fmt.Errorf("failed to restore evicted pods: %w", err) + } d.kubeClientSandbox.reset() } @@ -762,7 +916,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer } } - descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory, namespacedSharedInformerFactory) + // Always create descheduler with real client/factory first to register all informers + descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, rs.Client, sharedInformerFactory, nil) if err != nil { span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error()))) return err @@ -776,17 +931,46 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer return err } + // If in dry run mode, replace the descheduler with one using fake client/factory + if rs.DryRun { + // Create sandbox with resources to mirror from real client + kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory) + if err != nil { + span.AddEvent("Failed to create kube client sandbox", trace.WithAttributes(attribute.String("err", err.Error()))) + return fmt.Errorf("failed to create kube client sandbox: %v", err) + } + + klog.V(3).Infof("Building a cached client from the cluster for the dry run") + + // TODO(ingvagabund): drop the previous queue + // TODO(ingvagabund): stop the previous pod evictor + // Replace descheduler with one using fake client/factory + descheduler, err = newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox) + if err != nil { + span.AddEvent("Failed to create dry run descheduler", trace.WithAttributes(attribute.String("err", err.Error()))) + return err + } + } + + // In dry run mode, start and sync the fake shared informer factory so it can mirror + // events from the real factory. Reliable propagation depends on both factories being + // fully synced (see WaitForCacheSync calls below), not solely on startup order. + if rs.DryRun { + descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done()) + descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done()) + } sharedInformerFactory.Start(ctx.Done()) if metricProviderTokenReconciliation == secretReconciliation { namespacedSharedInformerFactory.Start(ctx.Done()) } sharedInformerFactory.WaitForCacheSync(ctx.Done()) - descheduler.podEvictor.WaitForEventHandlersSync(ctx) if metricProviderTokenReconciliation == secretReconciliation { namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done()) } + descheduler.podEvictor.WaitForEventHandlersSync(ctx) + if descheduler.metricsCollector != nil { go func() { klog.V(2).Infof("Starting metrics collector") diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index e59e64ebc..7fdc6e566 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -13,15 +13,18 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" apiversion "k8s.io/apimachinery/pkg/version" fakediscovery "k8s.io/client-go/discovery/fake" "k8s.io/client-go/informers" - clientset "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/featuregate" "k8s.io/klog/v2" "k8s.io/metrics/pkg/apis/metrics/v1beta1" @@ -196,20 +199,68 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) - descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory, nil) + // Always create descheduler with real client/factory first to register all informers + descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil) if err != nil { eventBroadcaster.Shutdown() - t.Fatalf("Unable to create a descheduler instance: %v", err) + t.Fatalf("Unable to create descheduler instance: %v", err) } + // Setup Prometheus provider (only for real client case, not for dry run) if err := setupPrometheusProvider(descheduler, nil); err != nil { eventBroadcaster.Shutdown() t.Fatalf("Failed to setup Prometheus provider: %v", err) } + // If in dry run mode, replace the descheduler with one using fake client/factory + if dryRun { + // Create sandbox with resources to mirror from real client + kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory) + if err != nil { + eventBroadcaster.Shutdown() + t.Fatalf("Failed to create kube client sandbox: %v", err) + } + + // Replace descheduler with one using fake client/factory + descheduler, err = newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox) + if err != nil { + eventBroadcaster.Shutdown() + t.Fatalf("Unable to create dry run descheduler instance: %v", err) + } + } + + // Start the real shared informer factory after creating the descheduler + if dryRun { + descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done()) + descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done()) + } sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) + if dryRun { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + for _, obj := range objects { + exists, err := descheduler.kubeClientSandbox.hasRuntimeObjectInIndexer(obj) + if err != nil { + return false, err + } + metaObj, err := meta.Accessor(obj) + if err != nil { + return false, fmt.Errorf("failed to get object metadata: %w", err) + } + key := cache.MetaObjectToName(metaObj).String() + if !exists { + klog.Infof("Object %q has not propagated to the indexer", key) + return false, nil + } + klog.Infof("Object %q has propagated to the indexer", key) + } + return true, nil + }); err != nil { + t.Fatalf("nodes did not propagate to the indexer: %v", err) + } + } + return rs, descheduler, client } @@ -550,36 +601,12 @@ func TestPodEvictorReset(t *testing.T) { } } -// runDeschedulerLoopAndGetEvictedPods is a temporary duplication from runDeschedulerLoop -// that will be removed after kubeClientSandbox gets migrated to event handlers. +// runDeschedulerLoopAndGetEvictedPods runs a descheduling cycle and returns the names of evicted pods. +// This is similar to runDeschedulerLoop but captures evicted pod names before the cache is reset. func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string { - var clientSet clientset.Interface - if dryRun { - if err := d.kubeClientSandbox.buildSandbox(); err != nil { - t.Fatalf("Failed to build sandbox: %v", err) - } - - getPodsAssignedToNode, err := setupInformerIndexers(d.kubeClientSandbox.fakeSharedInformerFactory(), d.deschedulerPolicy) - if err != nil { - t.Fatalf("Failed to setup indexers: %v", err) - } - d.getPodsAssignedToNode = getPodsAssignedToNode - - fakeCtx, cncl := context.WithCancel(context.TODO()) - defer cncl() - d.kubeClientSandbox.fakeSharedInformerFactory().Start(fakeCtx.Done()) - d.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(fakeCtx.Done()) - - clientSet = d.kubeClientSandbox.fakeClient() - d.sharedInformerFactory = d.kubeClientSandbox.fakeSharedInformerFactory() - } else { - clientSet = d.rs.Client - } - - d.podEvictor.SetClient(clientSet) d.podEvictor.ResetCounters() - d.runProfiles(ctx, clientSet) + d.runProfiles(ctx, d.client) var evictedPodNames []string if dryRun { @@ -587,6 +614,10 @@ func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *d for _, pod := range evictedPodsFromCache { evictedPodNames = append(evictedPodNames, pod.Name) } + + if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil { + t.Fatalf("Failed to restore evicted pods: %v", err) + } d.kubeClientSandbox.reset() } @@ -962,6 +993,8 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) { p3 := test.BuildTestPod("p3", 200, 0, node3.Name, updatePod) p4 := test.BuildTestPod("p4", 200, 0, node4.Name, updatePod) + objects := []runtime.Object{node1, node2, node3, node4, p1, p2, p3, p4} + // Map pod names to their node names for validation podToNode := map[string]string{ "p1": "n1", @@ -976,7 +1009,7 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) { } ctxCancel, cancel := context.WithCancel(ctx) - _, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, node1, node2, node3, node4, p1, p2, p3, p4) + _, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, objects...) defer cancel() // Verify all pods are created initially @@ -1008,13 +1041,13 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) { // Verify the correct number of nodes had pods evicted if len(nodesWithEvictedPods) != len(tc.expectedEvictedFromNodes) { - t.Errorf("Expected pods to be evicted from %d nodes, got %d nodes: %v", len(tc.expectedEvictedFromNodes), len(nodesWithEvictedPods), nodesWithEvictedPods) + t.Fatalf("Expected pods to be evicted from %d nodes, got %d nodes: %v", len(tc.expectedEvictedFromNodes), len(nodesWithEvictedPods), nodesWithEvictedPods) } // Verify pods were evicted from the correct nodes for _, nodeName := range tc.expectedEvictedFromNodes { if !nodesWithEvictedPods[nodeName] { - t.Errorf("Expected pod to be evicted from node %s, but it was not", nodeName) + t.Fatalf("Expected pod to be evicted from node %s, but it was not", nodeName) } } @@ -1028,7 +1061,7 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) { } } if !found { - t.Errorf("Unexpected eviction from node %s", nodeName) + t.Fatalf("Unexpected eviction from node %s", nodeName) } } @@ -1141,10 +1174,6 @@ func TestKubeClientSandboxReset(t *testing.T) { t.Fatalf("Failed to create kubeClientSandbox: %v", err) } - if err := sandbox.buildSandbox(); err != nil { - t.Fatalf("Failed to build sandbox: %v", err) - } - eviction1 := &policy.Eviction{ ObjectMeta: metav1.ObjectMeta{ Name: p1.Name, @@ -1158,6 +1187,9 @@ func TestKubeClientSandboxReset(t *testing.T) { }, } + sandbox.fakeSharedInformerFactory().Start(ctx.Done()) + sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done()) + if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil { t.Fatalf("Error evicting p1: %v", err) } @@ -1486,3 +1518,142 @@ func TestPodEvictionReactionFncErrorHandling(t *testing.T) { }) } } + +// verifyPodIdentityFields checks if name, namespace, and UID match expected values +func verifyPodIdentityFields(t *testing.T, name, namespace, uid, expectedName, expectedNamespace, expectedUID, context string) { + t.Helper() + if name != expectedName { + t.Fatalf("Expected pod name %s%s, got %s", expectedName, context, name) + } + if namespace != expectedNamespace { + t.Fatalf("Expected pod namespace %s%s, got %s", expectedNamespace, context, namespace) + } + if uid != expectedUID { + t.Fatalf("Expected pod UID %s%s, got %s", expectedUID, context, uid) + } +} + +// verifyPodIdentity checks if a pod has the expected name, namespace, and UID +func verifyPodIdentity(t *testing.T, pod *v1.Pod, expectedName, expectedNamespace string, expectedUID types.UID) { + t.Helper() + verifyPodIdentityFields(t, pod.Name, pod.Namespace, string(pod.UID), expectedName, expectedNamespace, string(expectedUID), "") +} + +func TestEvictedPodRestorationInDryRun(t *testing.T) { + // Initialize klog flags + // klog.InitFlags(nil) + + // Set verbosity level (higher number = more verbose) + // 0 = errors only, 1-4 = info, 5-9 = debug, 10+ = trace + // flag.Set("v", "4") + + initPluginRegistry() + + ctx := context.Background() + node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule) + node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + + p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef) + + internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy() + ctxCancel, cancel := context.WithCancel(ctx) + defer cancel() + + // Create descheduler with DryRun mode + client := fakeclientset.NewSimpleClientset(node1, node2, p1) + eventClient := fakeclientset.NewSimpleClientset(node1, node2, p1) + + rs, err := options.NewDeschedulerServer() + if err != nil { + t.Fatalf("Unable to initialize server: %v", err) + } + rs.Client = client + rs.EventClient = eventClient + rs.DefaultFeatureGates = initFeatureGates() + rs.DryRun = true // Set DryRun before creating descheduler + + sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) + eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctxCancel, client) + defer eventBroadcaster.Shutdown() + + // Always create descheduler with real client/factory first to register all informers + descheduler, err := newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil) + if err != nil { + t.Fatalf("Unable to create descheduler instance: %v", err) + } + + sharedInformerFactory.Start(ctxCancel.Done()) + sharedInformerFactory.WaitForCacheSync(ctxCancel.Done()) + + // Create sandbox with resources to mirror from real client + kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory) + if err != nil { + t.Fatalf("Failed to create kube client sandbox: %v", err) + } + + // Replace descheduler with one using fake client/factory + descheduler, err = newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox) + if err != nil { + t.Fatalf("Unable to create dry run descheduler instance: %v", err) + } + + // Start and sync the fake factory after creating the descheduler + kubeClientSandbox.fakeSharedInformerFactory().Start(ctxCancel.Done()) + kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctxCancel.Done()) + + // Verify the pod exists in the fake client after initialization + pod, err := kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Expected pod %s to exist in fake client after initialization, but got error: %v", p1.Name, err) + } + verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID) + klog.Infof("Pod %s exists in fake client after initialization", p1.Name) + + // Run two descheduling cycles to verify pod eviction and restoration works repeatedly + for i := 1; i <= 2; i++ { + // Run descheduling cycle + klog.Infof("Running descheduling cycle %d", i) + descheduler.podEvictor.ResetCounters() + descheduler.runProfiles(ctx, descheduler.client) + + // Verify the pod was evicted (should not exist in fake client anymore) + _, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{}) + if err == nil { + t.Fatalf("Expected pod %s to be evicted from fake client in cycle %d, but it still exists", p1.Name, i) + } + if !apierrors.IsNotFound(err) { + t.Fatalf("Expected NotFound error for pod %s in cycle %d, got: %v", p1.Name, i, err) + } + klog.Infof("Pod %s was successfully evicted from fake client in cycle %d", p1.Name, i) + + // Verify the pod was added to the evicted pods cache + evictedPods := descheduler.kubeClientSandbox.evictedPodsCache.list() + if len(evictedPods) != 1 { + t.Fatalf("Expected 1 pod in evicted cache in cycle %d, got %d", i, len(evictedPods)) + } + verifyPodIdentityFields(t, evictedPods[0].Name, evictedPods[0].Namespace, evictedPods[0].UID, p1.Name, p1.Namespace, string(p1.UID), fmt.Sprintf(" in cycle %d", i)) + klog.Infof("Pod %s was successfully added to evicted pods cache in cycle %d (UID: %s)", p1.Name, i, p1.UID) + + // Restore evicted pods + klog.Infof("Restoring evicted pods from cache in cycle %d", i) + if err := descheduler.kubeClientSandbox.restoreEvictedPods(ctx); err != nil { + t.Fatalf("Failed to restore evicted pods in cycle %d: %v", i, err) + } + descheduler.kubeClientSandbox.evictedPodsCache.clear() + + // Verify the pod was restored back to the fake client + pod, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Expected pod %s to be restored to fake client in cycle %d, but got error: %v", p1.Name, i, err) + } + verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID) + klog.Infof("Pod %s was successfully restored to fake client in cycle %d (UID: %s)", p1.Name, i, pod.UID) + + // Verify cache was cleared after restoration + evictedPods = descheduler.kubeClientSandbox.evictedPodsCache.list() + if len(evictedPods) != 0 { + t.Fatalf("Expected evicted cache to be empty after restoration in cycle %d, got %d pods", i, len(evictedPods)) + } + klog.Infof("Evicted pods cache was cleared after restoration in cycle %d", i) + } +} diff --git a/pkg/descheduler/evictions/evictions.go b/pkg/descheduler/evictions/evictions.go index df6964d4d..8b478243c 100644 --- a/pkg/descheduler/evictions/evictions.go +++ b/pkg/descheduler/evictions/evictions.go @@ -403,12 +403,6 @@ func (pe *PodEvictor) ResetCounters() { pe.totalPodCount = 0 } -func (pe *PodEvictor) SetClient(client clientset.Interface) { - pe.mu.Lock() - defer pe.mu.Unlock() - pe.client = client -} - func (pe *PodEvictor) evictionRequestsTotal() uint { if pe.featureGates.Enabled(features.EvictionsInBackground) { return pe.erCache.evictionRequestsTotal() diff --git a/test/test_utils.go b/test/test_utils.go index 4d641d723..d953b4a9b 100644 --- a/test/test_utils.go +++ b/test/test_utils.go @@ -43,6 +43,10 @@ import ( // BuildTestPod creates a test pod with given parameters. func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v1.Pod)) *v1.Pod { pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: name, @@ -76,6 +80,10 @@ func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget { maxUnavailable := intstr.FromInt32(1) pdb := &policyv1.PodDisruptionBudget{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "policy/v1", + Kind: "PodDisruptionBudget", + }, ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: name, @@ -94,6 +102,10 @@ func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget { func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + }, ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: name, @@ -114,6 +126,10 @@ func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim { // BuildPodMetrics creates a test podmetrics with given parameters. func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics { return &v1beta1.PodMetrics{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "metrics.k8s.io/v1beta1", + Kind: "PodMetrics", + }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: "default", @@ -171,6 +187,10 @@ func GetDaemonSetOwnerRefList() []metav1.OwnerReference { // BuildTestNode creates a node with specified capacity. func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) *v1.Node { node := &v1.Node{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Node", + }, ObjectMeta: metav1.ObjectMeta{ Name: name, SelfLink: fmt.Sprintf("/api/v1/nodes/%s", name), @@ -201,6 +221,10 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics { return &v1beta1.NodeMetrics{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "metrics.k8s.io/v1beta1", + Kind: "NodeMetrics", + }, ObjectMeta: metav1.ObjectMeta{ Name: name, },