diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 6f7095972..bc825e64b 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -22,7 +22,6 @@ import ( "math" "net/http" "strconv" - "sync" "time" promapi "github.com/prometheus/client_golang/api" @@ -31,15 +30,9 @@ import ( "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1" - policyv1 "k8s.io/api/policy/v1" - schedulingv1 "k8s.io/api/scheduling/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" @@ -49,7 +42,6 @@ import ( fakeclientset "k8s.io/client-go/kubernetes/fake" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" - core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" "k8s.io/client-go/util/workqueue" @@ -86,52 +78,6 @@ type profileRunner struct { descheduleEPs, balanceEPs eprunner } -// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode -type evictedPodInfo struct { - Namespace string - Name string - UID string -} - -// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode -type evictedPodsCache struct { - sync.RWMutex - pods map[string]*evictedPodInfo -} - -func newEvictedPodsCache() *evictedPodsCache { - return &evictedPodsCache{ - pods: make(map[string]*evictedPodInfo), - } -} - -func (c *evictedPodsCache) add(pod *v1.Pod) { - c.Lock() - defer c.Unlock() - c.pods[string(pod.UID)] = &evictedPodInfo{ - Namespace: pod.Namespace, - Name: pod.Name, - UID: string(pod.UID), - } -} - -func (c *evictedPodsCache) list() []*evictedPodInfo { - c.RLock() - defer c.RUnlock() - pods := make([]*evictedPodInfo, 0, len(c.pods)) - for _, pod := range c.pods { - podCopy := *pod - pods = append(pods, &podCopy) - } - return pods -} - -func (c *evictedPodsCache) clear() { - c.Lock() - defer c.Unlock() - c.pods = make(map[string]*evictedPodInfo) -} - type descheduler struct { rs *options.DeschedulerServer client clientset.Interface @@ -150,289 +96,6 @@ type descheduler struct { metricsProviders map[api.MetricsSource]*api.MetricsProvider } -// kubeClientSandbox creates a sandbox environment with a fake client and informer factory -// that mirrors resources from a real client, useful for dry-run testing scenarios -type kubeClientSandbox struct { - fakeKubeClient *fakeclientset.Clientset - fakeFactory informers.SharedInformerFactory - resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer - evictedPodsCache *evictedPodsCache - podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) -} - -func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) { - return newKubeClientSandbox(client, sharedInformerFactory, - v1.SchemeGroupVersion.WithResource("pods"), - v1.SchemeGroupVersion.WithResource("nodes"), - v1.SchemeGroupVersion.WithResource("namespaces"), - schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"), - policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), - v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"), - ) -} - -func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) { - sandbox := &kubeClientSandbox{ - resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer), - evictedPodsCache: newEvictedPodsCache(), - podEvictionReactionFnc: podEvictionReactionFnc, - } - - sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset() - // simulate a pod eviction by deleting a pod - sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache)) - sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0) - - for _, resource := range resources { - informer, err := sharedInformerFactory.ForResource(resource) - if err != nil { - return nil, err - } - sandbox.resourceToInformer[resource] = informer - } - - // Register event handlers to sync changes from real client to fake client. - // These handlers will keep the fake client in sync with ongoing changes. - if err := sandbox.registerEventHandlers(); err != nil { - return nil, fmt.Errorf("error registering event handlers: %w", err) - } - - return sandbox, nil -} - -func (sandbox *kubeClientSandbox) registerEventHandlers() error { - for resource, informer := range sandbox.resourceToInformer { - // Create a local copy to avoid closure capture issue - resource := resource - - _, err := sandbox.fakeFactory.ForResource(resource) - if err != nil { - return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err) - } - - _, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - runtimeObj, ok := obj.(runtime.Object) - if !ok { - klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) - return - } - if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil { - if !apierrors.IsAlreadyExists(err) { - klog.ErrorS(err, "failed to add object to fake client", "resource", resource) - } - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - runtimeObj, ok := newObj.(runtime.Object) - if !ok { - klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) - return - } - metaObj, err := meta.Accessor(runtimeObj) - if err != nil { - klog.ErrorS(err, "failed to get object metadata", "resource", resource) - return - } - if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil { - klog.ErrorS(err, "failed to update object in fake client", "resource", resource) - } - }, - DeleteFunc: func(obj interface{}) { - // Handle tombstone case where the object might be wrapped - if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = tombstone.Obj - } - - runtimeObj, ok := obj.(runtime.Object) - if !ok { - klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) - return - } - metaObj, err := meta.Accessor(runtimeObj) - if err != nil { - klog.ErrorS(err, "failed to get object metadata", "resource", resource) - return - } - if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil { - klog.ErrorS(err, "failed to delete object from fake client", "resource", resource) - } - }, - }) - if err != nil { - return fmt.Errorf("error adding event handler for resource %s: %w", resource, err) - } - } - return nil -} - -func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset { - return sandbox.fakeKubeClient -} - -func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory { - return sandbox.fakeFactory -} - -func (sandbox *kubeClientSandbox) reset() { - sandbox.evictedPodsCache.clear() -} - -// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource -func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name string) (bool, error) { - informer, err := sandbox.fakeFactory.ForResource(resource) - if err != nil { - return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err) - } - - key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String() - _, exists, err := informer.Informer().GetIndexer().GetByKey(key) - if err != nil { - return false, err - } - - return exists, nil -} - -// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type -func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) { - // Get metadata accessor to extract namespace and name - metaObj, err := meta.Accessor(obj) - if err != nil { - return false, fmt.Errorf("failed to get object metadata: %w", err) - } - - // Get the GVK from the object using TypeMeta - gvk := obj.GetObjectKind().GroupVersionKind() - if gvk.Empty() { - return false, fmt.Errorf("no GroupVersionKind found for object") - } - - // Use the GVK to construct the GVR by pluralizing the kind - plural, _ := meta.UnsafeGuessKindToResource(gvk) - gvr := schema.GroupVersionResource{ - Group: gvk.Group, - Version: gvk.Version, - Resource: plural.Resource, - } - - return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName()) -} - -func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error { - if len(pods) == 0 { - return nil - } - - err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - for _, pod := range pods { - satisfied, err := checkFn(pod) - if err != nil { - return false, err - } - if !satisfied { - return false, nil - } - } - return true, nil - }) - if err != nil { - return err - } - - klog.V(4).InfoS(successMsg) - return nil -} - -// restoreEvictedPods restores pods from the evicted pods cache back to the fake client -func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error { - podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")] - if !ok { - return fmt.Errorf("pod informer not found in resourceToInformer") - } - - evictedPods := sandbox.evictedPodsCache.list() - - // First wait loop: Check that all evicted pods are cleared from the indexers. - // This ensures the eviction has fully propagated through the fake informer's indexer. - if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) { - exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name) - if err != nil { - klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) - return false, nil - } - if exists { - klog.V(4).InfoS("Pod still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name) - return false, nil - } - klog.V(4).InfoS("Pod no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name) - return true, nil - }, "All evicted pods removed from fake indexer"); err != nil { - return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err) - } - - var restoredPods []*evictedPodInfo - for _, evictedPodInfo := range sandbox.evictedPodsCache.list() { - obj, err := podInformer.Lister().ByNamespace(evictedPodInfo.Namespace).Get(evictedPodInfo.Name) - if err != nil { - klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "error", err) - continue - } - - pod, ok := obj.(*v1.Pod) - if !ok { - klog.ErrorS(nil, "Object is not a pod", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name) - continue - } - - if string(pod.UID) != evictedPodInfo.UID { - klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "expectedUID", evictedPodInfo.UID, "actualUID", string(pod.UID)) - continue - } - - if err := sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", evictedPodInfo.Namespace, evictedPodInfo.Name, err) - } - klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "uid", evictedPodInfo.UID) - restoredPods = append(restoredPods, evictedPodInfo) - } - - // Second wait loop: Make sure the evicted pods are added back to the fake client. - // This ensures the restored pods are accessible through the fake informer's lister. - if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) { - podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name) - if err != nil { - klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name) - return false, nil - } - klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName) - return true, nil - }, "All restored pods are accessible in fake informer"); err != nil { - return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err) - } - - // Third wait loop: Make sure the indexers see the added pods. - // This is important to ensure each descheduling cycle can see all the restored pods. - // Without this wait, the next cycle might not see the restored pods in the indexer yet. - if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) { - exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name) - if err != nil { - klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) - return false, nil - } - if !exists { - klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name) - return false, nil - } - klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name) - return true, nil - }, "All restored pods are now in fake indexer"); err != nil { - return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err) - } - - return nil -} - func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) { nodeSelector := labels.Everything() if deschedulerPolicy.NodeSelector != nil { @@ -658,7 +321,7 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context) error { klog.V(3).Infof("Resetting pod evictor counters") d.podEvictor.ResetCounters() - d.runProfiles(ctx, d.client) + d.runProfiles(ctx) if d.rs.DryRun { if d.kubeClientSandbox == nil { @@ -680,7 +343,7 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context) error { // runProfiles runs all the deschedule plugins of all profiles and // later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins) // see https://github.com/kubernetes-sigs/descheduler/issues/979 -func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface) { +func (d *descheduler) runProfiles(ctx context.Context) { var span trace.Span ctx, span = tracing.Tracer().Start(ctx, "runProfiles") defer span.End() @@ -711,7 +374,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac ctx, profile, pluginregistry.PluginRegistry, - frameworkprofile.WithClientSet(client), + frameworkprofile.WithClientSet(d.client), frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), @@ -846,37 +509,6 @@ func validateVersionCompatibility(discovery discovery.DiscoveryInterface, desche return nil } -func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) { - return func(action core.Action) (bool, runtime.Object, error) { - if action.GetSubresource() == "eviction" { - createAct, matched := action.(core.CreateActionImpl) - if !matched { - return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl") - } - eviction, matched := createAct.Object.(*policy.Eviction) - if !matched { - return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction") - } - podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName()) - if err == nil { - if pod, ok := podObj.(*v1.Pod); ok { - evictedCache.add(pod) - } else { - return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName()) - } - } else if !apierrors.IsNotFound(err) { - return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err) - } - if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil { - return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err) - } - return true, nil, nil - } - // fallback to the default reactor - return false, nil, nil - } -} - type tokenReconciliation int const ( diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 7fdc6e566..9a518e6cd 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -606,7 +606,7 @@ func TestPodEvictorReset(t *testing.T) { func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string { d.podEvictor.ResetCounters() - d.runProfiles(ctx, d.client) + d.runProfiles(ctx) var evictedPodNames []string if dryRun { @@ -1148,257 +1148,6 @@ func TestLoadAwareDescheduling(t *testing.T) { t.Logf("Total evictions: %v", totalEs) } -func TestKubeClientSandboxReset(t *testing.T) { - ctx := context.Background() - node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) - p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef) - p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef) - - client := fakeclientset.NewSimpleClientset(node1, p1, p2) - sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0) - - // Explicitly get the informers to ensure they're registered - _ = sharedInformerFactory.Core().V1().Pods().Informer() - _ = sharedInformerFactory.Core().V1().Nodes().Informer() - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - sharedInformerFactory.Start(ctx.Done()) - sharedInformerFactory.WaitForCacheSync(ctx.Done()) - - sandbox, err := newKubeClientSandbox(client, sharedInformerFactory, - v1.SchemeGroupVersion.WithResource("pods"), - v1.SchemeGroupVersion.WithResource("nodes"), - ) - if err != nil { - t.Fatalf("Failed to create kubeClientSandbox: %v", err) - } - - eviction1 := &policy.Eviction{ - ObjectMeta: metav1.ObjectMeta{ - Name: p1.Name, - Namespace: p1.Namespace, - }, - } - eviction2 := &policy.Eviction{ - ObjectMeta: metav1.ObjectMeta{ - Name: p2.Name, - Namespace: p2.Namespace, - }, - } - - sandbox.fakeSharedInformerFactory().Start(ctx.Done()) - sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done()) - - if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil { - t.Fatalf("Error evicting p1: %v", err) - } - if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil { - t.Fatalf("Error evicting p2: %v", err) - } - - evictedPods := sandbox.evictedPodsCache.list() - if len(evictedPods) != 2 { - t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods)) - } - t.Logf("Evicted pods in cache before reset: %d", len(evictedPods)) - - for _, evictedPod := range evictedPods { - if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" { - t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID) - } - t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID) - } - - sandbox.reset() - - evictedPodsAfterReset := sandbox.evictedPodsCache.list() - if len(evictedPodsAfterReset) != 0 { - t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset)) - } - t.Logf("Successfully verified cache is empty after reset") -} - -func TestEvictedPodsCache(t *testing.T) { - t.Run("add single pod", func(t *testing.T) { - const ( - podName = "pod1" - podNamespace = "default" - podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" - ) - cache := newEvictedPodsCache() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Namespace: podNamespace, - UID: podUID, - }, - } - - cache.add(pod) - - pods := cache.list() - if len(pods) != 1 { - t.Fatalf("Expected 1 pod in cache, got %d", len(pods)) - } - if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID { - t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID) - } - }) - - t.Run("add multiple pods", func(t *testing.T) { - cache := newEvictedPodsCache() - pods := []*v1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}}, - } - - for _, pod := range pods { - cache.add(pod) - } - - cachedPods := cache.list() - if len(cachedPods) != 3 { - t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods)) - } - - podMap := make(map[string]*evictedPodInfo) - for _, cachedPod := range cachedPods { - podMap[cachedPod.UID] = cachedPod - } - - for _, pod := range pods { - cached, ok := podMap[string(pod.UID)] - if !ok { - t.Errorf("Pod with UID %s not found in cache", pod.UID) - continue - } - if cached.Name != pod.Name || cached.Namespace != pod.Namespace { - t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace) - } - } - }) - - t.Run("add duplicate pod updates entry", func(t *testing.T) { - const ( - duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" - updatedPodName = "pod1-new" - updatedPodNS = "kube-system" - ) - cache := newEvictedPodsCache() - pod1 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "default", - UID: duplicateUID, - }, - } - pod2 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: updatedPodName, - Namespace: updatedPodNS, - UID: duplicateUID, - }, - } - - cache.add(pod1) - cache.add(pod2) - - pods := cache.list() - if len(pods) != 1 { - t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods)) - } - if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS { - t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace) - } - }) - - t.Run("list returns empty array for empty cache", func(t *testing.T) { - cache := newEvictedPodsCache() - pods := cache.list() - if pods == nil { - t.Fatal("Expected non-nil slice from list()") - } - if len(pods) != 0 { - t.Fatalf("Expected empty list, got %d pods", len(pods)) - } - }) - - t.Run("list returns copies not references", func(t *testing.T) { - const originalPodName = "pod1" - cache := newEvictedPodsCache() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: originalPodName, - Namespace: "default", - UID: "12345678-1234-1234-1234-123456789abc", - }, - } - cache.add(pod) - - pods1 := cache.list() - pods2 := cache.list() - - if len(pods1) != 1 || len(pods2) != 1 { - t.Fatalf("Expected 1 pod in both lists") - } - - pods1[0].Name = "modified" - - if pods2[0].Name == "modified" { - t.Error("Modifying list result should not affect other list results (should be copies)") - } - - pods3 := cache.list() - if pods3[0].Name != originalPodName { - t.Error("Cache data was modified, list() should return copies") - } - }) - - t.Run("clear empties the cache", func(t *testing.T) { - cache := newEvictedPodsCache() - cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}}) - cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}}) - - if len(cache.list()) != 2 { - t.Fatal("Expected 2 pods before clear") - } - - cache.clear() - - pods := cache.list() - if len(pods) != 0 { - t.Fatalf("Expected empty cache after clear, got %d pods", len(pods)) - } - }) - - t.Run("clear on empty cache is safe", func(t *testing.T) { - cache := newEvictedPodsCache() - cache.clear() - - pods := cache.list() - if len(pods) != 0 { - t.Fatalf("Expected empty cache, got %d pods", len(pods)) - } - }) - - t.Run("add after clear works correctly", func(t *testing.T) { - cache := newEvictedPodsCache() - cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}}) - cache.clear() - cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}}) - - pods := cache.list() - if len(pods) != 1 { - t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods)) - } - if pods[0].Name != "pod2" { - t.Errorf("Expected pod2, got %s", pods[0].Name) - } - }) -} - func TestPodEvictionReactionFncErrorHandling(t *testing.T) { podsGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} @@ -1614,7 +1363,7 @@ func TestEvictedPodRestorationInDryRun(t *testing.T) { // Run descheduling cycle klog.Infof("Running descheduling cycle %d", i) descheduler.podEvictor.ResetCounters() - descheduler.runProfiles(ctx, descheduler.client) + descheduler.runProfiles(ctx) // Verify the pod was evicted (should not exist in fake client anymore) _, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{}) diff --git a/pkg/descheduler/kubeclientsandbox.go b/pkg/descheduler/kubeclientsandbox.go new file mode 100644 index 000000000..1eacd415a --- /dev/null +++ b/pkg/descheduler/kubeclientsandbox.go @@ -0,0 +1,446 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package descheduler + +import ( + "context" + "fmt" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" + policyv1 "k8s.io/api/policy/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + fakeclientset "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode +type evictedPodInfo struct { + Namespace string + Name string + UID string +} + +// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode +type evictedPodsCache struct { + sync.RWMutex + pods map[string]*evictedPodInfo +} + +func newEvictedPodsCache() *evictedPodsCache { + return &evictedPodsCache{ + pods: make(map[string]*evictedPodInfo), + } +} + +func (c *evictedPodsCache) add(pod *v1.Pod) { + c.Lock() + defer c.Unlock() + c.pods[string(pod.UID)] = &evictedPodInfo{ + Namespace: pod.Namespace, + Name: pod.Name, + UID: string(pod.UID), + } +} + +func (c *evictedPodsCache) list() []*evictedPodInfo { + c.RLock() + defer c.RUnlock() + pods := make([]*evictedPodInfo, 0, len(c.pods)) + for _, pod := range c.pods { + podCopy := *pod + pods = append(pods, &podCopy) + } + return pods +} + +func (c *evictedPodsCache) clear() { + c.Lock() + defer c.Unlock() + c.pods = make(map[string]*evictedPodInfo) +} + +// kubeClientSandbox creates a sandbox environment with a fake client and informer factory +// that mirrors resources from a real client, useful for dry-run testing scenarios +type kubeClientSandbox struct { + fakeKubeClient *fakeclientset.Clientset + fakeFactory informers.SharedInformerFactory + resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer + evictedPodsCache *evictedPodsCache + podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) +} + +func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) { + return newKubeClientSandbox(client, sharedInformerFactory, + v1.SchemeGroupVersion.WithResource("pods"), + v1.SchemeGroupVersion.WithResource("nodes"), + v1.SchemeGroupVersion.WithResource("namespaces"), + schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"), + policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), + v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"), + ) +} + +func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) { + sandbox := &kubeClientSandbox{ + resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer), + evictedPodsCache: newEvictedPodsCache(), + podEvictionReactionFnc: podEvictionReactionFnc, + } + + sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset() + // simulate a pod eviction by deleting a pod + sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache)) + sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0) + + for _, resource := range resources { + informer, err := sharedInformerFactory.ForResource(resource) + if err != nil { + return nil, err + } + sandbox.resourceToInformer[resource] = informer + } + + // Register event handlers to sync changes from real client to fake client. + // These handlers will keep the fake client in sync with ongoing changes. + if err := sandbox.registerEventHandlers(); err != nil { + return nil, fmt.Errorf("error registering event handlers: %w", err) + } + + return sandbox, nil +} + +func (sandbox *kubeClientSandbox) registerEventHandlers() error { + for resource, informer := range sandbox.resourceToInformer { + // Create a local copy to avoid closure capture issue + resource := resource + + _, err := sandbox.fakeFactory.ForResource(resource) + if err != nil { + return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err) + } + + _, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + runtimeObj, ok := obj.(runtime.Object) + if !ok { + klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) + return + } + if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil { + if !apierrors.IsAlreadyExists(err) { + klog.ErrorS(err, "failed to add object to fake client", "resource", resource) + } + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + runtimeObj, ok := newObj.(runtime.Object) + if !ok { + klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) + return + } + metaObj, err := meta.Accessor(runtimeObj) + if err != nil { + klog.ErrorS(err, "failed to get object metadata", "resource", resource) + return + } + if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil { + klog.ErrorS(err, "failed to update object in fake client", "resource", resource) + } + }, + DeleteFunc: func(obj interface{}) { + // Handle tombstone case where the object might be wrapped + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + + runtimeObj, ok := obj.(runtime.Object) + if !ok { + klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource) + return + } + metaObj, err := meta.Accessor(runtimeObj) + if err != nil { + klog.ErrorS(err, "failed to get object metadata", "resource", resource) + return + } + if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil { + klog.ErrorS(err, "failed to delete object from fake client", "resource", resource) + } + }, + }) + if err != nil { + return fmt.Errorf("error adding event handler for resource %s: %w", resource, err) + } + } + return nil +} + +func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset { + return sandbox.fakeKubeClient +} + +func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory { + return sandbox.fakeFactory +} + +func (sandbox *kubeClientSandbox) reset() { + sandbox.evictedPodsCache.clear() +} + +// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource +func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name, uid string) (bool, error) { + informer, err := sandbox.fakeFactory.ForResource(resource) + if err != nil { + return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err) + } + + key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String() + obj, exists, err := informer.Informer().GetIndexer().GetByKey(key) + if err != nil { + return false, err + } + + if !exists { + return false, nil + } + + metaObj, err := meta.Accessor(obj) + if err != nil { + return false, fmt.Errorf("failed to get object metadata: %w", err) + } + return string(metaObj.GetUID()) == uid, nil +} + +// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type +func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) { + // Get metadata accessor to extract namespace and name + metaObj, err := meta.Accessor(obj) + if err != nil { + return false, fmt.Errorf("failed to get object metadata: %w", err) + } + + // Get the GVK from the object using TypeMeta + gvk := obj.GetObjectKind().GroupVersionKind() + if gvk.Empty() { + return false, fmt.Errorf("no GroupVersionKind found for object") + } + + // Use the GVK to construct the GVR by pluralizing the kind + plural, _ := meta.UnsafeGuessKindToResource(gvk) + gvr := schema.GroupVersionResource{ + Group: gvk.Group, + Version: gvk.Version, + Resource: plural.Resource, + } + + return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName(), string(metaObj.GetUID())) +} + +// shouldSkipPodWait checks if a pod still exists in the fake client with the expected UID. +// Returns (shouldSkip, error). shouldSkip is true if we should skip waiting for this pod. +// Returns an error for unexpected failures (non-NotFound errors). +func (sandbox *kubeClientSandbox) shouldSkipPodWait(ctx context.Context, pod *evictedPodInfo) (bool, error) { + // Check if the pod still exists in the fake client (it could have been deleted from the real client + // and the deletion propagated via event handlers in the meantime) + fakePod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.V(3).InfoS("Pod no longer exists in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name) + return true, nil + } + return false, fmt.Errorf("error getting pod %s/%s from fake client: %w", pod.Namespace, pod.Name, err) + } + if string(fakePod.UID) != pod.UID { + klog.V(3).InfoS("Pod UID mismatch in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name, "expectedUID", pod.UID, "actualUID", string(fakePod.UID)) + return true, nil + } + return false, nil +} + +func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error { + if len(pods) == 0 { + return nil + } + + err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + for _, pod := range pods { + satisfied, err := checkFn(pod) + if err != nil { + return false, err + } + if !satisfied { + return false, nil + } + } + return true, nil + }) + if err != nil { + return err + } + + klog.V(4).InfoS(successMsg) + return nil +} + +// restoreEvictedPods restores pods from the evicted pods cache back to the fake client +func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error { + podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")] + if !ok { + return fmt.Errorf("pod informer not found in resourceToInformer") + } + + evictedPods := sandbox.evictedPodsCache.list() + + // First wait loop: Check that all evicted pods are cleared from the indexers. + // This ensures the eviction has fully propagated through the fake informer's indexer. + // We check both existence and UID match to handle cases where a pod was deleted and + // recreated with the same name but different UID. + if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) { + exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID) + if err != nil { + klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) + return false, nil + } + if exists { + klog.V(4).InfoS("Pod with matching UID still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID) + return false, nil + } + klog.V(4).InfoS("Pod with matching UID no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID) + return true, nil + }, "All evicted pods removed from fake indexer"); err != nil { + return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err) + } + + var restoredPods []*evictedPodInfo + for _, podInfo := range sandbox.evictedPodsCache.list() { + obj, err := podInformer.Lister().ByNamespace(podInfo.Namespace).Get(podInfo.Name) + if err != nil { + klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "error", err) + continue + } + + pod, ok := obj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Object is not a pod", "namespace", podInfo.Namespace, "name", podInfo.Name) + continue + } + + if string(pod.UID) != podInfo.UID { + klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "expectedUID", podInfo.UID, "actualUID", string(pod.UID)) + continue + } + + if err = sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", podInfo.Namespace, podInfo.Name, err) + } + klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", podInfo.Namespace, "name", podInfo.Name, "uid", podInfo.UID) + restoredPods = append(restoredPods, podInfo) + } + + // Second wait loop: Make sure the evicted pods are added back to the fake client. + // This ensures the restored pods are accessible through the fake informer's lister. + if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) { + podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name) + if err != nil { + klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name) + shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod) + if checkErr != nil { + return false, checkErr + } + if shouldSkip { + return true, nil + } + return false, nil + } + klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName) + return true, nil + }, "All restored pods are accessible in fake informer"); err != nil { + return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err) + } + + // Third wait loop: Make sure the indexers see the added pods. + // This is important to ensure each descheduling cycle can see all the restored pods. + // Without this wait, the next cycle might not see the restored pods in the indexer yet. + if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) { + exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID) + if err != nil { + klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) + return false, nil + } + if !exists { + klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name) + shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod) + if checkErr != nil { + return false, checkErr + } + if shouldSkip { + return true, nil + } + return false, nil + } + klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name) + return true, nil + }, "All restored pods are now in fake indexer"); err != nil { + return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err) + } + + return nil +} + +func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) { + return func(action core.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "eviction" { + createAct, matched := action.(core.CreateActionImpl) + if !matched { + return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl") + } + eviction, matched := createAct.Object.(*policy.Eviction) + if !matched { + return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction") + } + podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName()) + if err == nil { + if pod, ok := podObj.(*v1.Pod); ok { + evictedCache.add(pod) + } else { + return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName()) + } + } else if !apierrors.IsNotFound(err) { + return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err) + } + if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil { + return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err) + } + return true, nil, nil + } + // fallback to the default reactor + return false, nil, nil + } +} diff --git a/pkg/descheduler/kubeclientsandbox_test.go b/pkg/descheduler/kubeclientsandbox_test.go new file mode 100644 index 000000000..656ec53ab --- /dev/null +++ b/pkg/descheduler/kubeclientsandbox_test.go @@ -0,0 +1,582 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package descheduler + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + fakeclientset "k8s.io/client-go/kubernetes/fake" + + "sigs.k8s.io/descheduler/test" +) + +// setupTestSandbox creates and initializes a kubeClientSandbox for testing. +// It creates a fake client, informer factory, registers pod and node informers, +// creates the sandbox, and starts both factories. +func setupTestSandbox(ctx context.Context, t *testing.T, initialObjects ...runtime.Object) (*kubeClientSandbox, informers.SharedInformerFactory, clientset.Interface) { + // Create a "real" fake client to act as the source of truth + realClient := fakeclientset.NewSimpleClientset(initialObjects...) + realFactory := informers.NewSharedInformerFactoryWithOptions(realClient, 0) + + // Register pods and nodes informers BEFORE creating the sandbox + _ = realFactory.Core().V1().Pods().Informer() + _ = realFactory.Core().V1().Nodes().Informer() + + // Create the sandbox with only pods and nodes resources + sandbox, err := newKubeClientSandbox(realClient, realFactory, + v1.SchemeGroupVersion.WithResource("pods"), + v1.SchemeGroupVersion.WithResource("nodes"), + ) + if err != nil { + t.Fatalf("Failed to create kubeClientSandbox: %v", err) + } + + // fake factory created by newKubeClientSandbox needs to be started before + // the "real" fake client factory to have all handlers registered + // to get complete propagation + sandbox.fakeSharedInformerFactory().Start(ctx.Done()) + sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done()) + + realFactory.Start(ctx.Done()) + realFactory.WaitForCacheSync(ctx.Done()) + + return sandbox, realFactory, realClient +} + +func TestKubeClientSandboxEventHandlers(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sandbox, realFactory, realClient := setupTestSandbox(ctx, t) + + // Register a third resource (secrets) in the real factory AFTER sandbox creation + // This should NOT be synced to the fake client + _ = realFactory.Core().V1().Secrets().Informer() + + // Create test objects + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + UID: "pod-uid-12345", + }, + Spec: v1.PodSpec{ + NodeName: "test-node", + }, + } + + testNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + UID: "node-uid-67890", + }, + } + + testSecret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-secret", + Namespace: "default", + UID: "secret-uid-abcde", + }, + Data: map[string][]byte{ + "key": []byte("value"), + }, + } + + // Add objects to the real client + var err error + _, err = realClient.CoreV1().Pods(testPod.Namespace).Create(ctx, testPod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create pod in real client: %v", err) + } + + _, err = realClient.CoreV1().Nodes().Create(ctx, testNode, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create node in real client: %v", err) + } + + _, err = realClient.CoreV1().Secrets(testSecret.Namespace).Create(ctx, testSecret, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create secret in real client: %v", err) + } + + // Helper function to wait for a resource to appear in the sandbox's fake client indexer + waitForResourceInIndexer := func(resourceType, namespace, name, uid, description string) error { + t.Logf("Waiting for %s to appear in fake client indexer...", description) + return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + exists, err := sandbox.hasObjectInIndexer( + v1.SchemeGroupVersion.WithResource(resourceType), + namespace, + name, + uid, + ) + if err != nil { + t.Logf("Error checking %s in indexer: %v", description, err) + return false, nil + } + if exists { + t.Logf("%s appeared in fake client indexer", description) + return true, nil + } + return false, nil + }) + } + + // Wait for the pod to appear in the sandbox's fake client indexer + if err := waitForResourceInIndexer("pods", testPod.Namespace, testPod.Name, string(testPod.UID), "pod"); err != nil { + t.Fatalf("Pod did not appear in fake client indexer within timeout: %v", err) + } + + // Wait for the node to appear in the sandbox's fake client indexer + if err := waitForResourceInIndexer("nodes", "", testNode.Name, string(testNode.UID), "node"); err != nil { + t.Fatalf("Node did not appear in fake client indexer within timeout: %v", err) + } + + // Verify the pod can be retrieved from the fake client + retrievedPod, err := sandbox.fakeClient().CoreV1().Pods(testPod.Namespace).Get(ctx, testPod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to retrieve pod from fake client: %v", err) + } + if retrievedPod.Namespace != testPod.Namespace || retrievedPod.Name != testPod.Name || retrievedPod.UID != testPod.UID { + t.Errorf("Retrieved pod mismatch: got namespace=%s name=%s uid=%s, want namespace=%s name=%s uid=%s", + retrievedPod.Namespace, retrievedPod.Name, retrievedPod.UID, testPod.Namespace, testPod.Name, testPod.UID) + } + t.Logf("Successfully retrieved pod from fake client: %s/%s", retrievedPod.Namespace, retrievedPod.Name) + + // Verify the node can be retrieved from the fake client + retrievedNode, err := sandbox.fakeClient().CoreV1().Nodes().Get(ctx, testNode.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to retrieve node from fake client: %v", err) + } + if retrievedNode.Name != testNode.Name || retrievedNode.UID != testNode.UID { + t.Errorf("Retrieved node mismatch: got name=%s uid=%s, want name=%s uid=%s", + retrievedNode.Name, retrievedNode.UID, testNode.Name, testNode.UID) + } + t.Logf("Successfully retrieved node from fake client: %s", retrievedNode.Name) + + // Wait a bit longer and verify the secret does NOT appear in the fake client indexer + // because secrets were registered AFTER the sandbox was created + t.Log("Verifying secret does NOT appear in fake client indexer...") + time.Sleep(500 * time.Millisecond) // Give extra time to ensure it's not just a timing issue + + // First, verify we can get the informer for secrets in the fake factory + secretInformer, err := sandbox.fakeSharedInformerFactory().ForResource(v1.SchemeGroupVersion.WithResource("secrets")) + if err != nil { + t.Logf("Expected: Cannot get secret informer from fake factory: %v", err) + } else { + // If we can get the informer, check if the secret exists in it + key := "default/test-secret" + _, exists, err := secretInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + t.Logf("Error checking secret in fake indexer: %v", err) + } + if exists { + t.Error("Secret should NOT exist in fake client indexer (it was registered after sandbox creation)") + } else { + t.Log("Correctly verified: Secret does not exist in fake client indexer") + } + } + + // Also verify that attempting to get the secret directly from fake client should fail + _, err = sandbox.fakeClient().CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{}) + if err == nil { + t.Error("Secret should NOT be retrievable from fake client (it was not synced)") + } else { + t.Logf("Correctly verified: Secret not retrievable from fake client: %v", err) + } + + // Verify the secret IS in the real client (sanity check) + _, err = realClient.CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Secret should exist in real client but got error: %v", err) + } + t.Log("Sanity check passed: Secret exists in real client") +} + +func TestKubeClientSandboxReset(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef) + p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef) + + sandbox, _, _ := setupTestSandbox(ctx, t, node1, p1, p2) + + eviction1 := &policy.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: p1.Name, + Namespace: p1.Namespace, + }, + } + eviction2 := &policy.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: p2.Name, + Namespace: p2.Namespace, + }, + } + + if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil { + t.Fatalf("Error evicting p1: %v", err) + } + if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil { + t.Fatalf("Error evicting p2: %v", err) + } + + evictedPods := sandbox.evictedPodsCache.list() + if len(evictedPods) != 2 { + t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods)) + } + t.Logf("Evicted pods in cache before reset: %d", len(evictedPods)) + + for _, evictedPod := range evictedPods { + if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" { + t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID) + } + t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID) + } + + sandbox.reset() + + evictedPodsAfterReset := sandbox.evictedPodsCache.list() + if len(evictedPodsAfterReset) != 0 { + t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset)) + } + t.Logf("Successfully verified cache is empty after reset") +} + +func TestKubeClientSandboxRestoreEvictedPods(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + + // Create test pods + pod1 := test.BuildTestPod("pod1", 100, 0, node1.Name, test.SetRSOwnerRef) + pod2 := test.BuildTestPod("pod2", 100, 0, node1.Name, test.SetRSOwnerRef) + pod3 := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef) + pod4 := test.BuildTestPod("pod4", 100, 0, node1.Name, test.SetRSOwnerRef) + + sandbox, _, realClient := setupTestSandbox(ctx, t, node1, pod1, pod2, pod3, pod4) + + // Evict all pods + for _, pod := range []*v1.Pod{pod1, pod2, pod3, pod4} { + eviction := &policy.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + } + if err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).EvictV1(ctx, eviction); err != nil { + t.Fatalf("Error evicting %s: %v", pod.Name, err) + } + } + + // Delete pod2 from real client to simulate it being deleted (should skip restoration) + if err := realClient.CoreV1().Pods(pod2.Namespace).Delete(ctx, pod2.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Error deleting pod2 from real client: %v", err) + } + + // Delete and recreate pod3 with different UID in real client (should skip restoration) + if err := realClient.CoreV1().Pods(pod3.Namespace).Delete(ctx, pod3.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Error deleting pod3 from real client: %v", err) + } + pod3New := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef) + // Ensure pod3New has a different UID from pod3 + if pod3New.UID == pod3.UID { + pod3New.UID = "new-uid-for-pod3" + } + if _, err := realClient.CoreV1().Pods(pod3New.Namespace).Create(ctx, pod3New, metav1.CreateOptions{}); err != nil { + t.Fatalf("Error recreating pod3 in real client: %v", err) + } + + // Verify evicted pods are in cache + evictedPods := sandbox.evictedPodsCache.list() + if len(evictedPods) != 4 { + t.Fatalf("Expected 4 evicted pods in cache, got %d", len(evictedPods)) + } + t.Logf("Evicted pods in cache: %d", len(evictedPods)) + + // Call restoreEvictedPods + t.Log("Calling restoreEvictedPods...") + if err := sandbox.restoreEvictedPods(ctx); err != nil { + t.Fatalf("restoreEvictedPods failed: %v", err) + } + + // Verify pod1 and pod4 were restored (exists in fake client with matching UID and accessible via indexer) + for _, pod := range []*v1.Pod{pod1, pod4} { + // Check restoration via fake client + restoredPod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("%s should have been restored to fake client: %v", pod.Name, err) + } else { + if restoredPod.UID != pod.UID { + t.Errorf("Restored %s UID mismatch: got %s, want %s", pod.Name, restoredPod.UID, pod.UID) + } + t.Logf("Successfully verified %s restoration: %s/%s (UID: %s)", pod.Name, restoredPod.Namespace, restoredPod.Name, restoredPod.UID) + } + + // Check accessibility via indexer + exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, string(pod.UID)) + if err != nil { + t.Errorf("Error checking %s in indexer: %v", pod.Name, err) + } + if !exists { + t.Errorf("%s should exist in fake indexer after restoration", pod.Name) + } else { + t.Logf("Successfully verified %s exists in fake indexer", pod.Name) + } + } + + // Verify pod2 was NOT restored (deleted from real client) + _, err := sandbox.fakeClient().CoreV1().Pods(pod2.Namespace).Get(ctx, pod2.Name, metav1.GetOptions{}) + if err == nil { + t.Error("pod2 should NOT have been restored (was deleted from real client)") + } else { + t.Logf("Correctly verified pod2 was not restored: %v", err) + } + + // Verify pod3 was NOT restored with old UID (UID mismatch case) + // Note: pod3 may exist in fake client with NEW UID due to event handler syncing, + // but it should NOT have been restored with the OLD UID from evicted cache + pod3InFake, err := sandbox.fakeClient().CoreV1().Pods(pod3.Namespace).Get(ctx, pod3.Name, metav1.GetOptions{}) + if err == nil { + // Pod3 exists, but it should have the NEW UID, not the old one + if pod3InFake.UID == pod3.UID { + t.Error("pod3 should NOT have been restored with old UID (UID mismatch should prevent restoration)") + } else { + t.Logf("Correctly verified pod3 has new UID (%s), not old UID (%s) - restoration was skipped", pod3InFake.UID, pod3.UID) + } + } else { + // Pod3 doesn't exist - this is also acceptable (event handlers haven't synced it yet) + t.Logf("pod3 not found in fake client: %v", err) + } + + // Verify evicted pods cache is still intact (restoreEvictedPods doesn't clear it) + evictedPodsAfter := sandbox.evictedPodsCache.list() + if len(evictedPodsAfter) != 4 { + t.Errorf("Expected evicted pods cache to still have 4 entries, got %d", len(evictedPodsAfter)) + } +} + +func TestKubeClientSandboxRestoreEvictedPodsEmptyCache(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + sandbox, _, _ := setupTestSandbox(ctx, t, node1) + + // Call restoreEvictedPods with empty cache - should be a no-op + t.Log("Calling restoreEvictedPods with empty cache...") + if err := sandbox.restoreEvictedPods(ctx); err != nil { + t.Fatalf("restoreEvictedPods should succeed with empty cache: %v", err) + } + t.Log("Successfully verified restoreEvictedPods handles empty cache") +} + +func TestEvictedPodsCache(t *testing.T) { + t.Run("add single pod", func(t *testing.T) { + const ( + podName = "pod1" + podNamespace = "default" + podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + ) + cache := newEvictedPodsCache() + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: podNamespace, + UID: podUID, + }, + } + + cache.add(pod) + + pods := cache.list() + if len(pods) != 1 { + t.Fatalf("Expected 1 pod in cache, got %d", len(pods)) + } + if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID { + t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID) + } + }) + + t.Run("add multiple pods", func(t *testing.T) { + cache := newEvictedPodsCache() + pods := []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}}, + } + + for _, pod := range pods { + cache.add(pod) + } + + cachedPods := cache.list() + if len(cachedPods) != 3 { + t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods)) + } + + podMap := make(map[string]*evictedPodInfo) + for _, cachedPod := range cachedPods { + podMap[cachedPod.UID] = cachedPod + } + + for _, pod := range pods { + cached, ok := podMap[string(pod.UID)] + if !ok { + t.Errorf("Pod with UID %s not found in cache", pod.UID) + continue + } + if cached.Name != pod.Name || cached.Namespace != pod.Namespace { + t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace) + } + } + }) + + t.Run("add duplicate pod updates entry", func(t *testing.T) { + const ( + duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + updatedPodName = "pod1-new" + updatedPodNS = "kube-system" + ) + cache := newEvictedPodsCache() + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: duplicateUID, + }, + } + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: updatedPodName, + Namespace: updatedPodNS, + UID: duplicateUID, + }, + } + + cache.add(pod1) + cache.add(pod2) + + pods := cache.list() + if len(pods) != 1 { + t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods)) + } + if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS { + t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace) + } + }) + + t.Run("list returns empty array for empty cache", func(t *testing.T) { + cache := newEvictedPodsCache() + pods := cache.list() + if pods == nil { + t.Fatal("Expected non-nil slice from list()") + } + if len(pods) != 0 { + t.Fatalf("Expected empty list, got %d pods", len(pods)) + } + }) + + t.Run("list returns copies not references", func(t *testing.T) { + const originalPodName = "pod1" + cache := newEvictedPodsCache() + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: originalPodName, + Namespace: "default", + UID: "12345678-1234-1234-1234-123456789abc", + }, + } + cache.add(pod) + + pods1 := cache.list() + pods2 := cache.list() + + if len(pods1) != 1 || len(pods2) != 1 { + t.Fatalf("Expected 1 pod in both lists") + } + + pods1[0].Name = "modified" + + if pods2[0].Name == "modified" { + t.Error("Modifying list result should not affect other list results (should be copies)") + } + + pods3 := cache.list() + if pods3[0].Name != originalPodName { + t.Error("Cache data was modified, list() should return copies") + } + }) + + t.Run("clear empties the cache", func(t *testing.T) { + cache := newEvictedPodsCache() + cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}}) + cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}}) + + if len(cache.list()) != 2 { + t.Fatal("Expected 2 pods before clear") + } + + cache.clear() + + pods := cache.list() + if len(pods) != 0 { + t.Fatalf("Expected empty cache after clear, got %d pods", len(pods)) + } + }) + + t.Run("clear on empty cache is safe", func(t *testing.T) { + cache := newEvictedPodsCache() + cache.clear() + + pods := cache.list() + if len(pods) != 0 { + t.Fatalf("Expected empty cache, got %d pods", len(pods)) + } + }) + + t.Run("add after clear works correctly", func(t *testing.T) { + cache := newEvictedPodsCache() + cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}}) + cache.clear() + cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}}) + + pods := cache.list() + if len(pods) != 1 { + t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods)) + } + if pods[0].Name != "pod2" { + t.Errorf("Expected pod2, got %s", pods[0].Name) + } + }) +}