1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-25 20:59:28 +01:00

fix(kubeClientSandbox): do not wait for pods in the fake indexers if they are already deleted

This commit is contained in:
Jan Chaloupka
2026-01-24 13:56:54 +01:00
parent 45dc5a20d3
commit 263db33052
2 changed files with 198 additions and 21 deletions

View File

@@ -215,19 +215,27 @@ func (sandbox *kubeClientSandbox) reset() {
} }
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource // hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name string) (bool, error) { func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name, uid string) (bool, error) {
informer, err := sandbox.fakeFactory.ForResource(resource) informer, err := sandbox.fakeFactory.ForResource(resource)
if err != nil { if err != nil {
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err) return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
} }
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String() key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
_, exists, err := informer.Informer().GetIndexer().GetByKey(key) obj, exists, err := informer.Informer().GetIndexer().GetByKey(key)
if err != nil { if err != nil {
return false, err return false, err
} }
return exists, nil if !exists {
return false, nil
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
return string(metaObj.GetUID()) == uid, nil
} }
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type // hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
@@ -252,7 +260,28 @@ func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object)
Resource: plural.Resource, Resource: plural.Resource,
} }
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName()) return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName(), string(metaObj.GetUID()))
}
// shouldSkipPodWait checks if a pod still exists in the fake client with the expected UID.
// Returns (shouldSkip, error). shouldSkip is true if we should skip waiting for this pod.
// Returns an error for unexpected failures (non-NotFound errors).
func (sandbox *kubeClientSandbox) shouldSkipPodWait(ctx context.Context, pod *evictedPodInfo) (bool, error) {
// Check if the pod still exists in the fake client (it could have been deleted from the real client
// and the deletion propagated via event handlers in the meantime)
fakePod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(3).InfoS("Pod no longer exists in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}
return false, fmt.Errorf("error getting pod %s/%s from fake client: %w", pod.Namespace, pod.Name, err)
}
if string(fakePod.UID) != pod.UID {
klog.V(3).InfoS("Pod UID mismatch in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name, "expectedUID", pod.UID, "actualUID", string(fakePod.UID))
return true, nil
}
return false, nil
} }
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error { func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
@@ -291,46 +320,48 @@ func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error
// First wait loop: Check that all evicted pods are cleared from the indexers. // First wait loop: Check that all evicted pods are cleared from the indexers.
// This ensures the eviction has fully propagated through the fake informer's indexer. // This ensures the eviction has fully propagated through the fake informer's indexer.
// We check both existence and UID match to handle cases where a pod was deleted and
// recreated with the same name but different UID.
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) { if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name) exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
if err != nil { if err != nil {
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil return false, nil
} }
if exists { if exists {
klog.V(4).InfoS("Pod still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name) klog.V(4).InfoS("Pod with matching UID still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
return false, nil return false, nil
} }
klog.V(4).InfoS("Pod no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name) klog.V(4).InfoS("Pod with matching UID no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
return true, nil return true, nil
}, "All evicted pods removed from fake indexer"); err != nil { }, "All evicted pods removed from fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err) return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
} }
var restoredPods []*evictedPodInfo var restoredPods []*evictedPodInfo
for _, evictedPodInfo := range sandbox.evictedPodsCache.list() { for _, podInfo := range sandbox.evictedPodsCache.list() {
obj, err := podInformer.Lister().ByNamespace(evictedPodInfo.Namespace).Get(evictedPodInfo.Name) obj, err := podInformer.Lister().ByNamespace(podInfo.Namespace).Get(podInfo.Name)
if err != nil { if err != nil {
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "error", err) klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "error", err)
continue continue
} }
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
if !ok { if !ok {
klog.ErrorS(nil, "Object is not a pod", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name) klog.ErrorS(nil, "Object is not a pod", "namespace", podInfo.Namespace, "name", podInfo.Name)
continue continue
} }
if string(pod.UID) != evictedPodInfo.UID { if string(pod.UID) != podInfo.UID {
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "expectedUID", evictedPodInfo.UID, "actualUID", string(pod.UID)) klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "expectedUID", podInfo.UID, "actualUID", string(pod.UID))
continue continue
} }
if err := sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) { if err = sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", evictedPodInfo.Namespace, evictedPodInfo.Name, err) return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", podInfo.Namespace, podInfo.Name, err)
} }
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "uid", evictedPodInfo.UID) klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", podInfo.Namespace, "name", podInfo.Name, "uid", podInfo.UID)
restoredPods = append(restoredPods, evictedPodInfo) restoredPods = append(restoredPods, podInfo)
} }
// Second wait loop: Make sure the evicted pods are added back to the fake client. // Second wait loop: Make sure the evicted pods are added back to the fake client.
@@ -339,6 +370,13 @@ func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name) podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
if err != nil { if err != nil {
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name) klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
if checkErr != nil {
return false, checkErr
}
if shouldSkip {
return true, nil
}
return false, nil return false, nil
} }
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName) klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
@@ -351,13 +389,20 @@ func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error
// This is important to ensure each descheduling cycle can see all the restored pods. // This is important to ensure each descheduling cycle can see all the restored pods.
// Without this wait, the next cycle might not see the restored pods in the indexer yet. // Without this wait, the next cycle might not see the restored pods in the indexer yet.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) { if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name) exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
if err != nil { if err != nil {
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err) klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil return false, nil
} }
if !exists { if !exists {
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name) klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
if checkErr != nil {
return false, checkErr
}
if shouldSkip {
return true, nil
}
return false, nil return false, nil
} }
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name) klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)

View File

@@ -125,13 +125,14 @@ func TestKubeClientSandboxEventHandlers(t *testing.T) {
} }
// Helper function to wait for a resource to appear in the sandbox's fake client indexer // Helper function to wait for a resource to appear in the sandbox's fake client indexer
waitForResourceInIndexer := func(resourceType, namespace, name, description string) error { waitForResourceInIndexer := func(resourceType, namespace, name, uid, description string) error {
t.Logf("Waiting for %s to appear in fake client indexer...", description) t.Logf("Waiting for %s to appear in fake client indexer...", description)
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
exists, err := sandbox.hasObjectInIndexer( exists, err := sandbox.hasObjectInIndexer(
v1.SchemeGroupVersion.WithResource(resourceType), v1.SchemeGroupVersion.WithResource(resourceType),
namespace, namespace,
name, name,
uid,
) )
if err != nil { if err != nil {
t.Logf("Error checking %s in indexer: %v", description, err) t.Logf("Error checking %s in indexer: %v", description, err)
@@ -146,12 +147,12 @@ func TestKubeClientSandboxEventHandlers(t *testing.T) {
} }
// Wait for the pod to appear in the sandbox's fake client indexer // Wait for the pod to appear in the sandbox's fake client indexer
if err := waitForResourceInIndexer("pods", testPod.Namespace, testPod.Name, "pod"); err != nil { if err := waitForResourceInIndexer("pods", testPod.Namespace, testPod.Name, string(testPod.UID), "pod"); err != nil {
t.Fatalf("Pod did not appear in fake client indexer within timeout: %v", err) t.Fatalf("Pod did not appear in fake client indexer within timeout: %v", err)
} }
// Wait for the node to appear in the sandbox's fake client indexer // Wait for the node to appear in the sandbox's fake client indexer
if err := waitForResourceInIndexer("nodes", "", testNode.Name, "node"); err != nil { if err := waitForResourceInIndexer("nodes", "", testNode.Name, string(testNode.UID), "node"); err != nil {
t.Fatalf("Node did not appear in fake client indexer within timeout: %v", err) t.Fatalf("Node did not appear in fake client indexer within timeout: %v", err)
} }
@@ -269,6 +270,137 @@ func TestKubeClientSandboxReset(t *testing.T) {
t.Logf("Successfully verified cache is empty after reset") t.Logf("Successfully verified cache is empty after reset")
} }
func TestKubeClientSandboxRestoreEvictedPods(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
// Create test pods
pod1 := test.BuildTestPod("pod1", 100, 0, node1.Name, test.SetRSOwnerRef)
pod2 := test.BuildTestPod("pod2", 100, 0, node1.Name, test.SetRSOwnerRef)
pod3 := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
pod4 := test.BuildTestPod("pod4", 100, 0, node1.Name, test.SetRSOwnerRef)
sandbox, _, realClient := setupTestSandbox(ctx, t, node1, pod1, pod2, pod3, pod4)
// Evict all pods
for _, pod := range []*v1.Pod{pod1, pod2, pod3, pod4} {
eviction := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
if err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).EvictV1(ctx, eviction); err != nil {
t.Fatalf("Error evicting %s: %v", pod.Name, err)
}
}
// Delete pod2 from real client to simulate it being deleted (should skip restoration)
if err := realClient.CoreV1().Pods(pod2.Namespace).Delete(ctx, pod2.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod2 from real client: %v", err)
}
// Delete and recreate pod3 with different UID in real client (should skip restoration)
if err := realClient.CoreV1().Pods(pod3.Namespace).Delete(ctx, pod3.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod3 from real client: %v", err)
}
pod3New := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
// Ensure pod3New has a different UID from pod3
if pod3New.UID == pod3.UID {
pod3New.UID = "new-uid-for-pod3"
}
if _, err := realClient.CoreV1().Pods(pod3New.Namespace).Create(ctx, pod3New, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error recreating pod3 in real client: %v", err)
}
// Verify evicted pods are in cache
evictedPods := sandbox.evictedPodsCache.list()
if len(evictedPods) != 4 {
t.Fatalf("Expected 4 evicted pods in cache, got %d", len(evictedPods))
}
t.Logf("Evicted pods in cache: %d", len(evictedPods))
// Call restoreEvictedPods
t.Log("Calling restoreEvictedPods...")
if err := sandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("restoreEvictedPods failed: %v", err)
}
// Verify pod1 and pod4 were restored (exists in fake client with matching UID and accessible via indexer)
for _, pod := range []*v1.Pod{pod1, pod4} {
// Check restoration via fake client
restoredPod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("%s should have been restored to fake client: %v", pod.Name, err)
} else {
if restoredPod.UID != pod.UID {
t.Errorf("Restored %s UID mismatch: got %s, want %s", pod.Name, restoredPod.UID, pod.UID)
}
t.Logf("Successfully verified %s restoration: %s/%s (UID: %s)", pod.Name, restoredPod.Namespace, restoredPod.Name, restoredPod.UID)
}
// Check accessibility via indexer
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, string(pod.UID))
if err != nil {
t.Errorf("Error checking %s in indexer: %v", pod.Name, err)
}
if !exists {
t.Errorf("%s should exist in fake indexer after restoration", pod.Name)
} else {
t.Logf("Successfully verified %s exists in fake indexer", pod.Name)
}
}
// Verify pod2 was NOT restored (deleted from real client)
_, err := sandbox.fakeClient().CoreV1().Pods(pod2.Namespace).Get(ctx, pod2.Name, metav1.GetOptions{})
if err == nil {
t.Error("pod2 should NOT have been restored (was deleted from real client)")
} else {
t.Logf("Correctly verified pod2 was not restored: %v", err)
}
// Verify pod3 was NOT restored with old UID (UID mismatch case)
// Note: pod3 may exist in fake client with NEW UID due to event handler syncing,
// but it should NOT have been restored with the OLD UID from evicted cache
pod3InFake, err := sandbox.fakeClient().CoreV1().Pods(pod3.Namespace).Get(ctx, pod3.Name, metav1.GetOptions{})
if err == nil {
// Pod3 exists, but it should have the NEW UID, not the old one
if pod3InFake.UID == pod3.UID {
t.Error("pod3 should NOT have been restored with old UID (UID mismatch should prevent restoration)")
} else {
t.Logf("Correctly verified pod3 has new UID (%s), not old UID (%s) - restoration was skipped", pod3InFake.UID, pod3.UID)
}
} else {
// Pod3 doesn't exist - this is also acceptable (event handlers haven't synced it yet)
t.Logf("pod3 not found in fake client: %v", err)
}
// Verify evicted pods cache is still intact (restoreEvictedPods doesn't clear it)
evictedPodsAfter := sandbox.evictedPodsCache.list()
if len(evictedPodsAfter) != 4 {
t.Errorf("Expected evicted pods cache to still have 4 entries, got %d", len(evictedPodsAfter))
}
}
func TestKubeClientSandboxRestoreEvictedPodsEmptyCache(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
sandbox, _, _ := setupTestSandbox(ctx, t, node1)
// Call restoreEvictedPods with empty cache - should be a no-op
t.Log("Calling restoreEvictedPods with empty cache...")
if err := sandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("restoreEvictedPods should succeed with empty cache: %v", err)
}
t.Log("Successfully verified restoreEvictedPods handles empty cache")
}
func TestEvictedPodsCache(t *testing.T) { func TestEvictedPodsCache(t *testing.T) {
t.Run("add single pod", func(t *testing.T) { t.Run("add single pod", func(t *testing.T) {
const ( const (