1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-25 20:59:28 +01:00

Merge pull request #1814 from ingvagabund/refactorings

fix(kubeClientSandbox): do not wait for pods in the fake indexers if they are already deleted
This commit is contained in:
Kubernetes Prow Robot
2026-01-24 19:48:13 +05:30
committed by GitHub
4 changed files with 1033 additions and 624 deletions

View File

@@ -22,7 +22,6 @@ import (
"math" "math"
"net/http" "net/http"
"strconv" "strconv"
"sync"
"time" "time"
promapi "github.com/prometheus/client_golang/api" promapi "github.com/prometheus/client_golang/api"
@@ -31,15 +30,9 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version" utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@@ -49,7 +42,6 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake" fakeclientset "k8s.io/client-go/kubernetes/fake"
corev1listers "k8s.io/client-go/listers/core/v1" corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@@ -86,52 +78,6 @@ type profileRunner struct {
descheduleEPs, balanceEPs eprunner descheduleEPs, balanceEPs eprunner
} }
// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode
type evictedPodInfo struct {
Namespace string
Name string
UID string
}
// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode
type evictedPodsCache struct {
sync.RWMutex
pods map[string]*evictedPodInfo
}
func newEvictedPodsCache() *evictedPodsCache {
return &evictedPodsCache{
pods: make(map[string]*evictedPodInfo),
}
}
func (c *evictedPodsCache) add(pod *v1.Pod) {
c.Lock()
defer c.Unlock()
c.pods[string(pod.UID)] = &evictedPodInfo{
Namespace: pod.Namespace,
Name: pod.Name,
UID: string(pod.UID),
}
}
func (c *evictedPodsCache) list() []*evictedPodInfo {
c.RLock()
defer c.RUnlock()
pods := make([]*evictedPodInfo, 0, len(c.pods))
for _, pod := range c.pods {
podCopy := *pod
pods = append(pods, &podCopy)
}
return pods
}
func (c *evictedPodsCache) clear() {
c.Lock()
defer c.Unlock()
c.pods = make(map[string]*evictedPodInfo)
}
type descheduler struct { type descheduler struct {
rs *options.DeschedulerServer rs *options.DeschedulerServer
client clientset.Interface client clientset.Interface
@@ -150,289 +96,6 @@ type descheduler struct {
metricsProviders map[api.MetricsSource]*api.MetricsProvider metricsProviders map[api.MetricsSource]*api.MetricsProvider
} }
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory
// that mirrors resources from a real client, useful for dry-run testing scenarios
type kubeClientSandbox struct {
fakeKubeClient *fakeclientset.Clientset
fakeFactory informers.SharedInformerFactory
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
evictedPodsCache *evictedPodsCache
podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error)
}
func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) {
return newKubeClientSandbox(client, sharedInformerFactory,
v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
v1.SchemeGroupVersion.WithResource("namespaces"),
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"),
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"),
)
}
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
sandbox := &kubeClientSandbox{
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
evictedPodsCache: newEvictedPodsCache(),
podEvictionReactionFnc: podEvictionReactionFnc,
}
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
for _, resource := range resources {
informer, err := sharedInformerFactory.ForResource(resource)
if err != nil {
return nil, err
}
sandbox.resourceToInformer[resource] = informer
}
// Register event handlers to sync changes from real client to fake client.
// These handlers will keep the fake client in sync with ongoing changes.
if err := sandbox.registerEventHandlers(); err != nil {
return nil, fmt.Errorf("error registering event handlers: %w", err)
}
return sandbox, nil
}
func (sandbox *kubeClientSandbox) registerEventHandlers() error {
for resource, informer := range sandbox.resourceToInformer {
// Create a local copy to avoid closure capture issue
resource := resource
_, err := sandbox.fakeFactory.ForResource(resource)
if err != nil {
return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err)
}
_, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.ErrorS(err, "failed to add object to fake client", "resource", resource)
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
runtimeObj, ok := newObj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil {
klog.ErrorS(err, "failed to update object in fake client", "resource", resource)
}
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone case where the object might be wrapped
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil {
klog.ErrorS(err, "failed to delete object from fake client", "resource", resource)
}
},
})
if err != nil {
return fmt.Errorf("error adding event handler for resource %s: %w", resource, err)
}
}
return nil
}
func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset {
return sandbox.fakeKubeClient
}
func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory {
return sandbox.fakeFactory
}
func (sandbox *kubeClientSandbox) reset() {
sandbox.evictedPodsCache.clear()
}
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name string) (bool, error) {
informer, err := sandbox.fakeFactory.ForResource(resource)
if err != nil {
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
}
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
_, exists, err := informer.Informer().GetIndexer().GetByKey(key)
if err != nil {
return false, err
}
return exists, nil
}
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) {
// Get metadata accessor to extract namespace and name
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
// Get the GVK from the object using TypeMeta
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.Empty() {
return false, fmt.Errorf("no GroupVersionKind found for object")
}
// Use the GVK to construct the GVR by pluralizing the kind
plural, _ := meta.UnsafeGuessKindToResource(gvk)
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: plural.Resource,
}
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName())
}
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
if len(pods) == 0 {
return nil
}
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
for _, pod := range pods {
satisfied, err := checkFn(pod)
if err != nil {
return false, err
}
if !satisfied {
return false, nil
}
}
return true, nil
})
if err != nil {
return err
}
klog.V(4).InfoS(successMsg)
return nil
}
// restoreEvictedPods restores pods from the evicted pods cache back to the fake client
func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error {
podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")]
if !ok {
return fmt.Errorf("pod informer not found in resourceToInformer")
}
evictedPods := sandbox.evictedPodsCache.list()
// First wait loop: Check that all evicted pods are cleared from the indexers.
// This ensures the eviction has fully propagated through the fake informer's indexer.
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
if err != nil {
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if exists {
klog.V(4).InfoS("Pod still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
return false, nil
}
klog.V(4).InfoS("Pod no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}, "All evicted pods removed from fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
}
var restoredPods []*evictedPodInfo
for _, evictedPodInfo := range sandbox.evictedPodsCache.list() {
obj, err := podInformer.Lister().ByNamespace(evictedPodInfo.Namespace).Get(evictedPodInfo.Name)
if err != nil {
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "error", err)
continue
}
pod, ok := obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Object is not a pod", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name)
continue
}
if string(pod.UID) != evictedPodInfo.UID {
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "expectedUID", evictedPodInfo.UID, "actualUID", string(pod.UID))
continue
}
if err := sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", evictedPodInfo.Namespace, evictedPodInfo.Name, err)
}
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "uid", evictedPodInfo.UID)
restoredPods = append(restoredPods, evictedPodInfo)
}
// Second wait loop: Make sure the evicted pods are added back to the fake client.
// This ensures the restored pods are accessible through the fake informer's lister.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
return false, nil
}
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
return true, nil
}, "All restored pods are accessible in fake informer"); err != nil {
return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err)
}
// Third wait loop: Make sure the indexers see the added pods.
// This is important to ensure each descheduling cycle can see all the restored pods.
// Without this wait, the next cycle might not see the restored pods in the indexer yet.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
if err != nil {
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if !exists {
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
return false, nil
}
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}, "All restored pods are now in fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err)
}
return nil
}
func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) { func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) {
nodeSelector := labels.Everything() nodeSelector := labels.Everything()
if deschedulerPolicy.NodeSelector != nil { if deschedulerPolicy.NodeSelector != nil {
@@ -658,7 +321,7 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context) error {
klog.V(3).Infof("Resetting pod evictor counters") klog.V(3).Infof("Resetting pod evictor counters")
d.podEvictor.ResetCounters() d.podEvictor.ResetCounters()
d.runProfiles(ctx, d.client) d.runProfiles(ctx)
if d.rs.DryRun { if d.rs.DryRun {
if d.kubeClientSandbox == nil { if d.kubeClientSandbox == nil {
@@ -680,7 +343,7 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context) error {
// runProfiles runs all the deschedule plugins of all profiles and // runProfiles runs all the deschedule plugins of all profiles and
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins) // later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
// see https://github.com/kubernetes-sigs/descheduler/issues/979 // see https://github.com/kubernetes-sigs/descheduler/issues/979
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface) { func (d *descheduler) runProfiles(ctx context.Context) {
var span trace.Span var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "runProfiles") ctx, span = tracing.Tracer().Start(ctx, "runProfiles")
defer span.End() defer span.End()
@@ -711,7 +374,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
ctx, ctx,
profile, profile,
pluginregistry.PluginRegistry, pluginregistry.PluginRegistry,
frameworkprofile.WithClientSet(client), frameworkprofile.WithClientSet(d.client),
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithPodEvictor(d.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
@@ -846,37 +509,6 @@ func validateVersionCompatibility(discovery discovery.DiscoveryInterface, desche
return nil return nil
} }
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) {
return func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "eviction" {
createAct, matched := action.(core.CreateActionImpl)
if !matched {
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
}
eviction, matched := createAct.Object.(*policy.Eviction)
if !matched {
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
}
podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName())
if err == nil {
if pod, ok := podObj.(*v1.Pod); ok {
evictedCache.add(pod)
} else {
return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName())
}
} else if !apierrors.IsNotFound(err) {
return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
return true, nil, nil
}
// fallback to the default reactor
return false, nil, nil
}
}
type tokenReconciliation int type tokenReconciliation int
const ( const (

View File

@@ -606,7 +606,7 @@ func TestPodEvictorReset(t *testing.T) {
func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string { func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string {
d.podEvictor.ResetCounters() d.podEvictor.ResetCounters()
d.runProfiles(ctx, d.client) d.runProfiles(ctx)
var evictedPodNames []string var evictedPodNames []string
if dryRun { if dryRun {
@@ -1148,257 +1148,6 @@ func TestLoadAwareDescheduling(t *testing.T) {
t.Logf("Total evictions: %v", totalEs) t.Logf("Total evictions: %v", totalEs)
} }
func TestKubeClientSandboxReset(t *testing.T) {
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
client := fakeclientset.NewSimpleClientset(node1, p1, p2)
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0)
// Explicitly get the informers to ensure they're registered
_ = sharedInformerFactory.Core().V1().Pods().Informer()
_ = sharedInformerFactory.Core().V1().Nodes().Informer()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
sandbox, err := newKubeClientSandbox(client, sharedInformerFactory,
v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
)
if err != nil {
t.Fatalf("Failed to create kubeClientSandbox: %v", err)
}
eviction1 := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p1.Name,
Namespace: p1.Namespace,
},
}
eviction2 := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p2.Name,
Namespace: p2.Namespace,
},
}
sandbox.fakeSharedInformerFactory().Start(ctx.Done())
sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil {
t.Fatalf("Error evicting p1: %v", err)
}
if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil {
t.Fatalf("Error evicting p2: %v", err)
}
evictedPods := sandbox.evictedPodsCache.list()
if len(evictedPods) != 2 {
t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods))
}
t.Logf("Evicted pods in cache before reset: %d", len(evictedPods))
for _, evictedPod := range evictedPods {
if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" {
t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
}
t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
}
sandbox.reset()
evictedPodsAfterReset := sandbox.evictedPodsCache.list()
if len(evictedPodsAfterReset) != 0 {
t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset))
}
t.Logf("Successfully verified cache is empty after reset")
}
func TestEvictedPodsCache(t *testing.T) {
t.Run("add single pod", func(t *testing.T) {
const (
podName = "pod1"
podNamespace = "default"
podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
)
cache := newEvictedPodsCache()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
}
cache.add(pod)
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod in cache, got %d", len(pods))
}
if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID {
t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID)
}
})
t.Run("add multiple pods", func(t *testing.T) {
cache := newEvictedPodsCache()
pods := []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}},
}
for _, pod := range pods {
cache.add(pod)
}
cachedPods := cache.list()
if len(cachedPods) != 3 {
t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods))
}
podMap := make(map[string]*evictedPodInfo)
for _, cachedPod := range cachedPods {
podMap[cachedPod.UID] = cachedPod
}
for _, pod := range pods {
cached, ok := podMap[string(pod.UID)]
if !ok {
t.Errorf("Pod with UID %s not found in cache", pod.UID)
continue
}
if cached.Name != pod.Name || cached.Namespace != pod.Namespace {
t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace)
}
}
})
t.Run("add duplicate pod updates entry", func(t *testing.T) {
const (
duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
updatedPodName = "pod1-new"
updatedPodNS = "kube-system"
)
cache := newEvictedPodsCache()
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: duplicateUID,
},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: updatedPodName,
Namespace: updatedPodNS,
UID: duplicateUID,
},
}
cache.add(pod1)
cache.add(pod2)
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods))
}
if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS {
t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace)
}
})
t.Run("list returns empty array for empty cache", func(t *testing.T) {
cache := newEvictedPodsCache()
pods := cache.list()
if pods == nil {
t.Fatal("Expected non-nil slice from list()")
}
if len(pods) != 0 {
t.Fatalf("Expected empty list, got %d pods", len(pods))
}
})
t.Run("list returns copies not references", func(t *testing.T) {
const originalPodName = "pod1"
cache := newEvictedPodsCache()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: originalPodName,
Namespace: "default",
UID: "12345678-1234-1234-1234-123456789abc",
},
}
cache.add(pod)
pods1 := cache.list()
pods2 := cache.list()
if len(pods1) != 1 || len(pods2) != 1 {
t.Fatalf("Expected 1 pod in both lists")
}
pods1[0].Name = "modified"
if pods2[0].Name == "modified" {
t.Error("Modifying list result should not affect other list results (should be copies)")
}
pods3 := cache.list()
if pods3[0].Name != originalPodName {
t.Error("Cache data was modified, list() should return copies")
}
})
t.Run("clear empties the cache", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}})
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}})
if len(cache.list()) != 2 {
t.Fatal("Expected 2 pods before clear")
}
cache.clear()
pods := cache.list()
if len(pods) != 0 {
t.Fatalf("Expected empty cache after clear, got %d pods", len(pods))
}
})
t.Run("clear on empty cache is safe", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.clear()
pods := cache.list()
if len(pods) != 0 {
t.Fatalf("Expected empty cache, got %d pods", len(pods))
}
})
t.Run("add after clear works correctly", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}})
cache.clear()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}})
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods))
}
if pods[0].Name != "pod2" {
t.Errorf("Expected pod2, got %s", pods[0].Name)
}
})
}
func TestPodEvictionReactionFncErrorHandling(t *testing.T) { func TestPodEvictionReactionFncErrorHandling(t *testing.T) {
podsGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} podsGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
@@ -1614,7 +1363,7 @@ func TestEvictedPodRestorationInDryRun(t *testing.T) {
// Run descheduling cycle // Run descheduling cycle
klog.Infof("Running descheduling cycle %d", i) klog.Infof("Running descheduling cycle %d", i)
descheduler.podEvictor.ResetCounters() descheduler.podEvictor.ResetCounters()
descheduler.runProfiles(ctx, descheduler.client) descheduler.runProfiles(ctx)
// Verify the pod was evicted (should not exist in fake client anymore) // Verify the pod was evicted (should not exist in fake client anymore)
_, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{}) _, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})

View File

@@ -0,0 +1,446 @@
/*
Copyright 2026 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package descheduler
import (
"context"
"fmt"
"sync"
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode
type evictedPodInfo struct {
Namespace string
Name string
UID string
}
// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode
type evictedPodsCache struct {
sync.RWMutex
pods map[string]*evictedPodInfo
}
func newEvictedPodsCache() *evictedPodsCache {
return &evictedPodsCache{
pods: make(map[string]*evictedPodInfo),
}
}
func (c *evictedPodsCache) add(pod *v1.Pod) {
c.Lock()
defer c.Unlock()
c.pods[string(pod.UID)] = &evictedPodInfo{
Namespace: pod.Namespace,
Name: pod.Name,
UID: string(pod.UID),
}
}
func (c *evictedPodsCache) list() []*evictedPodInfo {
c.RLock()
defer c.RUnlock()
pods := make([]*evictedPodInfo, 0, len(c.pods))
for _, pod := range c.pods {
podCopy := *pod
pods = append(pods, &podCopy)
}
return pods
}
func (c *evictedPodsCache) clear() {
c.Lock()
defer c.Unlock()
c.pods = make(map[string]*evictedPodInfo)
}
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory
// that mirrors resources from a real client, useful for dry-run testing scenarios
type kubeClientSandbox struct {
fakeKubeClient *fakeclientset.Clientset
fakeFactory informers.SharedInformerFactory
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
evictedPodsCache *evictedPodsCache
podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error)
}
func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) {
return newKubeClientSandbox(client, sharedInformerFactory,
v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
v1.SchemeGroupVersion.WithResource("namespaces"),
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"),
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"),
)
}
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
sandbox := &kubeClientSandbox{
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
evictedPodsCache: newEvictedPodsCache(),
podEvictionReactionFnc: podEvictionReactionFnc,
}
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
for _, resource := range resources {
informer, err := sharedInformerFactory.ForResource(resource)
if err != nil {
return nil, err
}
sandbox.resourceToInformer[resource] = informer
}
// Register event handlers to sync changes from real client to fake client.
// These handlers will keep the fake client in sync with ongoing changes.
if err := sandbox.registerEventHandlers(); err != nil {
return nil, fmt.Errorf("error registering event handlers: %w", err)
}
return sandbox, nil
}
func (sandbox *kubeClientSandbox) registerEventHandlers() error {
for resource, informer := range sandbox.resourceToInformer {
// Create a local copy to avoid closure capture issue
resource := resource
_, err := sandbox.fakeFactory.ForResource(resource)
if err != nil {
return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err)
}
_, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.ErrorS(err, "failed to add object to fake client", "resource", resource)
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
runtimeObj, ok := newObj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil {
klog.ErrorS(err, "failed to update object in fake client", "resource", resource)
}
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone case where the object might be wrapped
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil {
klog.ErrorS(err, "failed to delete object from fake client", "resource", resource)
}
},
})
if err != nil {
return fmt.Errorf("error adding event handler for resource %s: %w", resource, err)
}
}
return nil
}
func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset {
return sandbox.fakeKubeClient
}
func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory {
return sandbox.fakeFactory
}
func (sandbox *kubeClientSandbox) reset() {
sandbox.evictedPodsCache.clear()
}
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name, uid string) (bool, error) {
informer, err := sandbox.fakeFactory.ForResource(resource)
if err != nil {
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
}
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
obj, exists, err := informer.Informer().GetIndexer().GetByKey(key)
if err != nil {
return false, err
}
if !exists {
return false, nil
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
return string(metaObj.GetUID()) == uid, nil
}
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) {
// Get metadata accessor to extract namespace and name
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
// Get the GVK from the object using TypeMeta
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.Empty() {
return false, fmt.Errorf("no GroupVersionKind found for object")
}
// Use the GVK to construct the GVR by pluralizing the kind
plural, _ := meta.UnsafeGuessKindToResource(gvk)
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: plural.Resource,
}
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName(), string(metaObj.GetUID()))
}
// shouldSkipPodWait checks if a pod still exists in the fake client with the expected UID.
// Returns (shouldSkip, error). shouldSkip is true if we should skip waiting for this pod.
// Returns an error for unexpected failures (non-NotFound errors).
func (sandbox *kubeClientSandbox) shouldSkipPodWait(ctx context.Context, pod *evictedPodInfo) (bool, error) {
// Check if the pod still exists in the fake client (it could have been deleted from the real client
// and the deletion propagated via event handlers in the meantime)
fakePod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(3).InfoS("Pod no longer exists in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}
return false, fmt.Errorf("error getting pod %s/%s from fake client: %w", pod.Namespace, pod.Name, err)
}
if string(fakePod.UID) != pod.UID {
klog.V(3).InfoS("Pod UID mismatch in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name, "expectedUID", pod.UID, "actualUID", string(fakePod.UID))
return true, nil
}
return false, nil
}
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
if len(pods) == 0 {
return nil
}
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
for _, pod := range pods {
satisfied, err := checkFn(pod)
if err != nil {
return false, err
}
if !satisfied {
return false, nil
}
}
return true, nil
})
if err != nil {
return err
}
klog.V(4).InfoS(successMsg)
return nil
}
// restoreEvictedPods restores pods from the evicted pods cache back to the fake client
func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error {
podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")]
if !ok {
return fmt.Errorf("pod informer not found in resourceToInformer")
}
evictedPods := sandbox.evictedPodsCache.list()
// First wait loop: Check that all evicted pods are cleared from the indexers.
// This ensures the eviction has fully propagated through the fake informer's indexer.
// We check both existence and UID match to handle cases where a pod was deleted and
// recreated with the same name but different UID.
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
if err != nil {
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if exists {
klog.V(4).InfoS("Pod with matching UID still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
return false, nil
}
klog.V(4).InfoS("Pod with matching UID no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
return true, nil
}, "All evicted pods removed from fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
}
var restoredPods []*evictedPodInfo
for _, podInfo := range sandbox.evictedPodsCache.list() {
obj, err := podInformer.Lister().ByNamespace(podInfo.Namespace).Get(podInfo.Name)
if err != nil {
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "error", err)
continue
}
pod, ok := obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Object is not a pod", "namespace", podInfo.Namespace, "name", podInfo.Name)
continue
}
if string(pod.UID) != podInfo.UID {
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "expectedUID", podInfo.UID, "actualUID", string(pod.UID))
continue
}
if err = sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", podInfo.Namespace, podInfo.Name, err)
}
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", podInfo.Namespace, "name", podInfo.Name, "uid", podInfo.UID)
restoredPods = append(restoredPods, podInfo)
}
// Second wait loop: Make sure the evicted pods are added back to the fake client.
// This ensures the restored pods are accessible through the fake informer's lister.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
if checkErr != nil {
return false, checkErr
}
if shouldSkip {
return true, nil
}
return false, nil
}
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
return true, nil
}, "All restored pods are accessible in fake informer"); err != nil {
return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err)
}
// Third wait loop: Make sure the indexers see the added pods.
// This is important to ensure each descheduling cycle can see all the restored pods.
// Without this wait, the next cycle might not see the restored pods in the indexer yet.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
if err != nil {
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if !exists {
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
if checkErr != nil {
return false, checkErr
}
if shouldSkip {
return true, nil
}
return false, nil
}
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}, "All restored pods are now in fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err)
}
return nil
}
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) {
return func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "eviction" {
createAct, matched := action.(core.CreateActionImpl)
if !matched {
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
}
eviction, matched := createAct.Object.(*policy.Eviction)
if !matched {
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
}
podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName())
if err == nil {
if pod, ok := podObj.(*v1.Pod); ok {
evictedCache.add(pod)
} else {
return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName())
}
} else if !apierrors.IsNotFound(err) {
return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
return true, nil, nil
}
// fallback to the default reactor
return false, nil, nil
}
}

View File

@@ -0,0 +1,582 @@
/*
Copyright 2026 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package descheduler
import (
"context"
"testing"
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/descheduler/test"
)
// setupTestSandbox creates and initializes a kubeClientSandbox for testing.
// It creates a fake client, informer factory, registers pod and node informers,
// creates the sandbox, and starts both factories.
func setupTestSandbox(ctx context.Context, t *testing.T, initialObjects ...runtime.Object) (*kubeClientSandbox, informers.SharedInformerFactory, clientset.Interface) {
// Create a "real" fake client to act as the source of truth
realClient := fakeclientset.NewSimpleClientset(initialObjects...)
realFactory := informers.NewSharedInformerFactoryWithOptions(realClient, 0)
// Register pods and nodes informers BEFORE creating the sandbox
_ = realFactory.Core().V1().Pods().Informer()
_ = realFactory.Core().V1().Nodes().Informer()
// Create the sandbox with only pods and nodes resources
sandbox, err := newKubeClientSandbox(realClient, realFactory,
v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
)
if err != nil {
t.Fatalf("Failed to create kubeClientSandbox: %v", err)
}
// fake factory created by newKubeClientSandbox needs to be started before
// the "real" fake client factory to have all handlers registered
// to get complete propagation
sandbox.fakeSharedInformerFactory().Start(ctx.Done())
sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
realFactory.Start(ctx.Done())
realFactory.WaitForCacheSync(ctx.Done())
return sandbox, realFactory, realClient
}
func TestKubeClientSandboxEventHandlers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sandbox, realFactory, realClient := setupTestSandbox(ctx, t)
// Register a third resource (secrets) in the real factory AFTER sandbox creation
// This should NOT be synced to the fake client
_ = realFactory.Core().V1().Secrets().Informer()
// Create test objects
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
UID: "pod-uid-12345",
},
Spec: v1.PodSpec{
NodeName: "test-node",
},
}
testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
UID: "node-uid-67890",
},
}
testSecret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test-secret",
Namespace: "default",
UID: "secret-uid-abcde",
},
Data: map[string][]byte{
"key": []byte("value"),
},
}
// Add objects to the real client
var err error
_, err = realClient.CoreV1().Pods(testPod.Namespace).Create(ctx, testPod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod in real client: %v", err)
}
_, err = realClient.CoreV1().Nodes().Create(ctx, testNode, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create node in real client: %v", err)
}
_, err = realClient.CoreV1().Secrets(testSecret.Namespace).Create(ctx, testSecret, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create secret in real client: %v", err)
}
// Helper function to wait for a resource to appear in the sandbox's fake client indexer
waitForResourceInIndexer := func(resourceType, namespace, name, uid, description string) error {
t.Logf("Waiting for %s to appear in fake client indexer...", description)
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(
v1.SchemeGroupVersion.WithResource(resourceType),
namespace,
name,
uid,
)
if err != nil {
t.Logf("Error checking %s in indexer: %v", description, err)
return false, nil
}
if exists {
t.Logf("%s appeared in fake client indexer", description)
return true, nil
}
return false, nil
})
}
// Wait for the pod to appear in the sandbox's fake client indexer
if err := waitForResourceInIndexer("pods", testPod.Namespace, testPod.Name, string(testPod.UID), "pod"); err != nil {
t.Fatalf("Pod did not appear in fake client indexer within timeout: %v", err)
}
// Wait for the node to appear in the sandbox's fake client indexer
if err := waitForResourceInIndexer("nodes", "", testNode.Name, string(testNode.UID), "node"); err != nil {
t.Fatalf("Node did not appear in fake client indexer within timeout: %v", err)
}
// Verify the pod can be retrieved from the fake client
retrievedPod, err := sandbox.fakeClient().CoreV1().Pods(testPod.Namespace).Get(ctx, testPod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve pod from fake client: %v", err)
}
if retrievedPod.Namespace != testPod.Namespace || retrievedPod.Name != testPod.Name || retrievedPod.UID != testPod.UID {
t.Errorf("Retrieved pod mismatch: got namespace=%s name=%s uid=%s, want namespace=%s name=%s uid=%s",
retrievedPod.Namespace, retrievedPod.Name, retrievedPod.UID, testPod.Namespace, testPod.Name, testPod.UID)
}
t.Logf("Successfully retrieved pod from fake client: %s/%s", retrievedPod.Namespace, retrievedPod.Name)
// Verify the node can be retrieved from the fake client
retrievedNode, err := sandbox.fakeClient().CoreV1().Nodes().Get(ctx, testNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve node from fake client: %v", err)
}
if retrievedNode.Name != testNode.Name || retrievedNode.UID != testNode.UID {
t.Errorf("Retrieved node mismatch: got name=%s uid=%s, want name=%s uid=%s",
retrievedNode.Name, retrievedNode.UID, testNode.Name, testNode.UID)
}
t.Logf("Successfully retrieved node from fake client: %s", retrievedNode.Name)
// Wait a bit longer and verify the secret does NOT appear in the fake client indexer
// because secrets were registered AFTER the sandbox was created
t.Log("Verifying secret does NOT appear in fake client indexer...")
time.Sleep(500 * time.Millisecond) // Give extra time to ensure it's not just a timing issue
// First, verify we can get the informer for secrets in the fake factory
secretInformer, err := sandbox.fakeSharedInformerFactory().ForResource(v1.SchemeGroupVersion.WithResource("secrets"))
if err != nil {
t.Logf("Expected: Cannot get secret informer from fake factory: %v", err)
} else {
// If we can get the informer, check if the secret exists in it
key := "default/test-secret"
_, exists, err := secretInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
t.Logf("Error checking secret in fake indexer: %v", err)
}
if exists {
t.Error("Secret should NOT exist in fake client indexer (it was registered after sandbox creation)")
} else {
t.Log("Correctly verified: Secret does not exist in fake client indexer")
}
}
// Also verify that attempting to get the secret directly from fake client should fail
_, err = sandbox.fakeClient().CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{})
if err == nil {
t.Error("Secret should NOT be retrievable from fake client (it was not synced)")
} else {
t.Logf("Correctly verified: Secret not retrievable from fake client: %v", err)
}
// Verify the secret IS in the real client (sanity check)
_, err = realClient.CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Secret should exist in real client but got error: %v", err)
}
t.Log("Sanity check passed: Secret exists in real client")
}
func TestKubeClientSandboxReset(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
sandbox, _, _ := setupTestSandbox(ctx, t, node1, p1, p2)
eviction1 := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p1.Name,
Namespace: p1.Namespace,
},
}
eviction2 := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p2.Name,
Namespace: p2.Namespace,
},
}
if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil {
t.Fatalf("Error evicting p1: %v", err)
}
if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil {
t.Fatalf("Error evicting p2: %v", err)
}
evictedPods := sandbox.evictedPodsCache.list()
if len(evictedPods) != 2 {
t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods))
}
t.Logf("Evicted pods in cache before reset: %d", len(evictedPods))
for _, evictedPod := range evictedPods {
if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" {
t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
}
t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
}
sandbox.reset()
evictedPodsAfterReset := sandbox.evictedPodsCache.list()
if len(evictedPodsAfterReset) != 0 {
t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset))
}
t.Logf("Successfully verified cache is empty after reset")
}
func TestKubeClientSandboxRestoreEvictedPods(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
// Create test pods
pod1 := test.BuildTestPod("pod1", 100, 0, node1.Name, test.SetRSOwnerRef)
pod2 := test.BuildTestPod("pod2", 100, 0, node1.Name, test.SetRSOwnerRef)
pod3 := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
pod4 := test.BuildTestPod("pod4", 100, 0, node1.Name, test.SetRSOwnerRef)
sandbox, _, realClient := setupTestSandbox(ctx, t, node1, pod1, pod2, pod3, pod4)
// Evict all pods
for _, pod := range []*v1.Pod{pod1, pod2, pod3, pod4} {
eviction := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
if err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).EvictV1(ctx, eviction); err != nil {
t.Fatalf("Error evicting %s: %v", pod.Name, err)
}
}
// Delete pod2 from real client to simulate it being deleted (should skip restoration)
if err := realClient.CoreV1().Pods(pod2.Namespace).Delete(ctx, pod2.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod2 from real client: %v", err)
}
// Delete and recreate pod3 with different UID in real client (should skip restoration)
if err := realClient.CoreV1().Pods(pod3.Namespace).Delete(ctx, pod3.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod3 from real client: %v", err)
}
pod3New := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
// Ensure pod3New has a different UID from pod3
if pod3New.UID == pod3.UID {
pod3New.UID = "new-uid-for-pod3"
}
if _, err := realClient.CoreV1().Pods(pod3New.Namespace).Create(ctx, pod3New, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error recreating pod3 in real client: %v", err)
}
// Verify evicted pods are in cache
evictedPods := sandbox.evictedPodsCache.list()
if len(evictedPods) != 4 {
t.Fatalf("Expected 4 evicted pods in cache, got %d", len(evictedPods))
}
t.Logf("Evicted pods in cache: %d", len(evictedPods))
// Call restoreEvictedPods
t.Log("Calling restoreEvictedPods...")
if err := sandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("restoreEvictedPods failed: %v", err)
}
// Verify pod1 and pod4 were restored (exists in fake client with matching UID and accessible via indexer)
for _, pod := range []*v1.Pod{pod1, pod4} {
// Check restoration via fake client
restoredPod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("%s should have been restored to fake client: %v", pod.Name, err)
} else {
if restoredPod.UID != pod.UID {
t.Errorf("Restored %s UID mismatch: got %s, want %s", pod.Name, restoredPod.UID, pod.UID)
}
t.Logf("Successfully verified %s restoration: %s/%s (UID: %s)", pod.Name, restoredPod.Namespace, restoredPod.Name, restoredPod.UID)
}
// Check accessibility via indexer
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, string(pod.UID))
if err != nil {
t.Errorf("Error checking %s in indexer: %v", pod.Name, err)
}
if !exists {
t.Errorf("%s should exist in fake indexer after restoration", pod.Name)
} else {
t.Logf("Successfully verified %s exists in fake indexer", pod.Name)
}
}
// Verify pod2 was NOT restored (deleted from real client)
_, err := sandbox.fakeClient().CoreV1().Pods(pod2.Namespace).Get(ctx, pod2.Name, metav1.GetOptions{})
if err == nil {
t.Error("pod2 should NOT have been restored (was deleted from real client)")
} else {
t.Logf("Correctly verified pod2 was not restored: %v", err)
}
// Verify pod3 was NOT restored with old UID (UID mismatch case)
// Note: pod3 may exist in fake client with NEW UID due to event handler syncing,
// but it should NOT have been restored with the OLD UID from evicted cache
pod3InFake, err := sandbox.fakeClient().CoreV1().Pods(pod3.Namespace).Get(ctx, pod3.Name, metav1.GetOptions{})
if err == nil {
// Pod3 exists, but it should have the NEW UID, not the old one
if pod3InFake.UID == pod3.UID {
t.Error("pod3 should NOT have been restored with old UID (UID mismatch should prevent restoration)")
} else {
t.Logf("Correctly verified pod3 has new UID (%s), not old UID (%s) - restoration was skipped", pod3InFake.UID, pod3.UID)
}
} else {
// Pod3 doesn't exist - this is also acceptable (event handlers haven't synced it yet)
t.Logf("pod3 not found in fake client: %v", err)
}
// Verify evicted pods cache is still intact (restoreEvictedPods doesn't clear it)
evictedPodsAfter := sandbox.evictedPodsCache.list()
if len(evictedPodsAfter) != 4 {
t.Errorf("Expected evicted pods cache to still have 4 entries, got %d", len(evictedPodsAfter))
}
}
func TestKubeClientSandboxRestoreEvictedPodsEmptyCache(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
sandbox, _, _ := setupTestSandbox(ctx, t, node1)
// Call restoreEvictedPods with empty cache - should be a no-op
t.Log("Calling restoreEvictedPods with empty cache...")
if err := sandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("restoreEvictedPods should succeed with empty cache: %v", err)
}
t.Log("Successfully verified restoreEvictedPods handles empty cache")
}
func TestEvictedPodsCache(t *testing.T) {
t.Run("add single pod", func(t *testing.T) {
const (
podName = "pod1"
podNamespace = "default"
podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
)
cache := newEvictedPodsCache()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
}
cache.add(pod)
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod in cache, got %d", len(pods))
}
if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID {
t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID)
}
})
t.Run("add multiple pods", func(t *testing.T) {
cache := newEvictedPodsCache()
pods := []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}},
}
for _, pod := range pods {
cache.add(pod)
}
cachedPods := cache.list()
if len(cachedPods) != 3 {
t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods))
}
podMap := make(map[string]*evictedPodInfo)
for _, cachedPod := range cachedPods {
podMap[cachedPod.UID] = cachedPod
}
for _, pod := range pods {
cached, ok := podMap[string(pod.UID)]
if !ok {
t.Errorf("Pod with UID %s not found in cache", pod.UID)
continue
}
if cached.Name != pod.Name || cached.Namespace != pod.Namespace {
t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace)
}
}
})
t.Run("add duplicate pod updates entry", func(t *testing.T) {
const (
duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
updatedPodName = "pod1-new"
updatedPodNS = "kube-system"
)
cache := newEvictedPodsCache()
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: duplicateUID,
},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: updatedPodName,
Namespace: updatedPodNS,
UID: duplicateUID,
},
}
cache.add(pod1)
cache.add(pod2)
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods))
}
if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS {
t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace)
}
})
t.Run("list returns empty array for empty cache", func(t *testing.T) {
cache := newEvictedPodsCache()
pods := cache.list()
if pods == nil {
t.Fatal("Expected non-nil slice from list()")
}
if len(pods) != 0 {
t.Fatalf("Expected empty list, got %d pods", len(pods))
}
})
t.Run("list returns copies not references", func(t *testing.T) {
const originalPodName = "pod1"
cache := newEvictedPodsCache()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: originalPodName,
Namespace: "default",
UID: "12345678-1234-1234-1234-123456789abc",
},
}
cache.add(pod)
pods1 := cache.list()
pods2 := cache.list()
if len(pods1) != 1 || len(pods2) != 1 {
t.Fatalf("Expected 1 pod in both lists")
}
pods1[0].Name = "modified"
if pods2[0].Name == "modified" {
t.Error("Modifying list result should not affect other list results (should be copies)")
}
pods3 := cache.list()
if pods3[0].Name != originalPodName {
t.Error("Cache data was modified, list() should return copies")
}
})
t.Run("clear empties the cache", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}})
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}})
if len(cache.list()) != 2 {
t.Fatal("Expected 2 pods before clear")
}
cache.clear()
pods := cache.list()
if len(pods) != 0 {
t.Fatalf("Expected empty cache after clear, got %d pods", len(pods))
}
})
t.Run("clear on empty cache is safe", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.clear()
pods := cache.list()
if len(pods) != 0 {
t.Fatalf("Expected empty cache, got %d pods", len(pods))
}
})
t.Run("add after clear works correctly", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}})
cache.clear()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}})
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods))
}
if pods[0].Name != "pod2" {
t.Errorf("Expected pod2, got %s", pods[0].Name)
}
})
}