mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-25 20:59:28 +01:00
refactor(kubeClientSandbox): move the code under a separate file
This commit is contained in:
@@ -22,7 +22,6 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
promapi "github.com/prometheus/client_golang/api"
|
||||
@@ -31,15 +30,9 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
policyv1 "k8s.io/api/policy/v1"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilversion "k8s.io/apimachinery/pkg/util/version"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@@ -49,7 +42,6 @@ import (
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@@ -86,52 +78,6 @@ type profileRunner struct {
|
||||
descheduleEPs, balanceEPs eprunner
|
||||
}
|
||||
|
||||
// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode
|
||||
type evictedPodInfo struct {
|
||||
Namespace string
|
||||
Name string
|
||||
UID string
|
||||
}
|
||||
|
||||
// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode
|
||||
type evictedPodsCache struct {
|
||||
sync.RWMutex
|
||||
pods map[string]*evictedPodInfo
|
||||
}
|
||||
|
||||
func newEvictedPodsCache() *evictedPodsCache {
|
||||
return &evictedPodsCache{
|
||||
pods: make(map[string]*evictedPodInfo),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) add(pod *v1.Pod) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.pods[string(pod.UID)] = &evictedPodInfo{
|
||||
Namespace: pod.Namespace,
|
||||
Name: pod.Name,
|
||||
UID: string(pod.UID),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) list() []*evictedPodInfo {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
pods := make([]*evictedPodInfo, 0, len(c.pods))
|
||||
for _, pod := range c.pods {
|
||||
podCopy := *pod
|
||||
pods = append(pods, &podCopy)
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) clear() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.pods = make(map[string]*evictedPodInfo)
|
||||
}
|
||||
|
||||
type descheduler struct {
|
||||
rs *options.DeschedulerServer
|
||||
client clientset.Interface
|
||||
@@ -150,289 +96,6 @@ type descheduler struct {
|
||||
metricsProviders map[api.MetricsSource]*api.MetricsProvider
|
||||
}
|
||||
|
||||
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory
|
||||
// that mirrors resources from a real client, useful for dry-run testing scenarios
|
||||
type kubeClientSandbox struct {
|
||||
fakeKubeClient *fakeclientset.Clientset
|
||||
fakeFactory informers.SharedInformerFactory
|
||||
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
|
||||
evictedPodsCache *evictedPodsCache
|
||||
podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error)
|
||||
}
|
||||
|
||||
func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) {
|
||||
return newKubeClientSandbox(client, sharedInformerFactory,
|
||||
v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
v1.SchemeGroupVersion.WithResource("namespaces"),
|
||||
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"),
|
||||
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
|
||||
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"),
|
||||
)
|
||||
}
|
||||
|
||||
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
|
||||
sandbox := &kubeClientSandbox{
|
||||
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
|
||||
evictedPodsCache: newEvictedPodsCache(),
|
||||
podEvictionReactionFnc: podEvictionReactionFnc,
|
||||
}
|
||||
|
||||
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
|
||||
// simulate a pod eviction by deleting a pod
|
||||
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
|
||||
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
|
||||
|
||||
for _, resource := range resources {
|
||||
informer, err := sharedInformerFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sandbox.resourceToInformer[resource] = informer
|
||||
}
|
||||
|
||||
// Register event handlers to sync changes from real client to fake client.
|
||||
// These handlers will keep the fake client in sync with ongoing changes.
|
||||
if err := sandbox.registerEventHandlers(); err != nil {
|
||||
return nil, fmt.Errorf("error registering event handlers: %w", err)
|
||||
}
|
||||
|
||||
return sandbox, nil
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) registerEventHandlers() error {
|
||||
for resource, informer := range sandbox.resourceToInformer {
|
||||
// Create a local copy to avoid closure capture issue
|
||||
resource := resource
|
||||
|
||||
_, err := sandbox.fakeFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err)
|
||||
}
|
||||
|
||||
_, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
klog.ErrorS(err, "failed to add object to fake client", "resource", resource)
|
||||
}
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
runtimeObj, ok := newObj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
metaObj, err := meta.Accessor(runtimeObj)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil {
|
||||
klog.ErrorS(err, "failed to update object in fake client", "resource", resource)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// Handle tombstone case where the object might be wrapped
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = tombstone.Obj
|
||||
}
|
||||
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
metaObj, err := meta.Accessor(runtimeObj)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil {
|
||||
klog.ErrorS(err, "failed to delete object from fake client", "resource", resource)
|
||||
}
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error adding event handler for resource %s: %w", resource, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset {
|
||||
return sandbox.fakeKubeClient
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory {
|
||||
return sandbox.fakeFactory
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) reset() {
|
||||
sandbox.evictedPodsCache.clear()
|
||||
}
|
||||
|
||||
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
|
||||
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name string) (bool, error) {
|
||||
informer, err := sandbox.fakeFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
|
||||
}
|
||||
|
||||
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
|
||||
_, exists, err := informer.Informer().GetIndexer().GetByKey(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
|
||||
func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) {
|
||||
// Get metadata accessor to extract namespace and name
|
||||
metaObj, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get object metadata: %w", err)
|
||||
}
|
||||
|
||||
// Get the GVK from the object using TypeMeta
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
if gvk.Empty() {
|
||||
return false, fmt.Errorf("no GroupVersionKind found for object")
|
||||
}
|
||||
|
||||
// Use the GVK to construct the GVR by pluralizing the kind
|
||||
plural, _ := meta.UnsafeGuessKindToResource(gvk)
|
||||
gvr := schema.GroupVersionResource{
|
||||
Group: gvk.Group,
|
||||
Version: gvk.Version,
|
||||
Resource: plural.Resource,
|
||||
}
|
||||
|
||||
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName())
|
||||
}
|
||||
|
||||
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
|
||||
if len(pods) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
for _, pod := range pods {
|
||||
satisfied, err := checkFn(pod)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !satisfied {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(4).InfoS(successMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreEvictedPods restores pods from the evicted pods cache back to the fake client
|
||||
func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error {
|
||||
podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")]
|
||||
if !ok {
|
||||
return fmt.Errorf("pod informer not found in resourceToInformer")
|
||||
}
|
||||
|
||||
evictedPods := sandbox.evictedPodsCache.list()
|
||||
|
||||
// First wait loop: Check that all evicted pods are cleared from the indexers.
|
||||
// This ensures the eviction has fully propagated through the fake informer's indexer.
|
||||
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
|
||||
return false, nil
|
||||
}
|
||||
if exists {
|
||||
klog.V(4).InfoS("Pod still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Pod no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return true, nil
|
||||
}, "All evicted pods removed from fake indexer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
|
||||
}
|
||||
|
||||
var restoredPods []*evictedPodInfo
|
||||
for _, evictedPodInfo := range sandbox.evictedPodsCache.list() {
|
||||
obj, err := podInformer.Lister().ByNamespace(evictedPodInfo.Namespace).Get(evictedPodInfo.Name)
|
||||
if err != nil {
|
||||
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Object is not a pod", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
if string(pod.UID) != evictedPodInfo.UID {
|
||||
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "expectedUID", evictedPodInfo.UID, "actualUID", string(pod.UID))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", evictedPodInfo.Namespace, evictedPodInfo.Name, err)
|
||||
}
|
||||
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "uid", evictedPodInfo.UID)
|
||||
restoredPods = append(restoredPods, evictedPodInfo)
|
||||
}
|
||||
|
||||
// Second wait loop: Make sure the evicted pods are added back to the fake client.
|
||||
// This ensures the restored pods are accessible through the fake informer's lister.
|
||||
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
|
||||
return true, nil
|
||||
}, "All restored pods are accessible in fake informer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err)
|
||||
}
|
||||
|
||||
// Third wait loop: Make sure the indexers see the added pods.
|
||||
// This is important to ensure each descheduling cycle can see all the restored pods.
|
||||
// Without this wait, the next cycle might not see the restored pods in the indexer yet.
|
||||
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
|
||||
return false, nil
|
||||
}
|
||||
if !exists {
|
||||
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return true, nil
|
||||
}, "All restored pods are now in fake indexer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) {
|
||||
nodeSelector := labels.Everything()
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
@@ -846,37 +509,6 @@ func validateVersionCompatibility(discovery discovery.DiscoveryInterface, desche
|
||||
return nil
|
||||
}
|
||||
|
||||
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return func(action core.Action) (bool, runtime.Object, error) {
|
||||
if action.GetSubresource() == "eviction" {
|
||||
createAct, matched := action.(core.CreateActionImpl)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
|
||||
}
|
||||
eviction, matched := createAct.Object.(*policy.Eviction)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
|
||||
}
|
||||
podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName())
|
||||
if err == nil {
|
||||
if pod, ok := podObj.(*v1.Pod); ok {
|
||||
evictedCache.add(pod)
|
||||
} else {
|
||||
return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName())
|
||||
}
|
||||
} else if !apierrors.IsNotFound(err) {
|
||||
return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
||||
}
|
||||
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
|
||||
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
||||
}
|
||||
return true, nil, nil
|
||||
}
|
||||
// fallback to the default reactor
|
||||
return false, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
type tokenReconciliation int
|
||||
|
||||
const (
|
||||
|
||||
@@ -1148,257 +1148,6 @@ func TestLoadAwareDescheduling(t *testing.T) {
|
||||
t.Logf("Total evictions: %v", totalEs)
|
||||
}
|
||||
|
||||
func TestKubeClientSandboxReset(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
|
||||
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
|
||||
client := fakeclientset.NewSimpleClientset(node1, p1, p2)
|
||||
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0)
|
||||
|
||||
// Explicitly get the informers to ensure they're registered
|
||||
_ = sharedInformerFactory.Core().V1().Pods().Informer()
|
||||
_ = sharedInformerFactory.Core().V1().Nodes().Informer()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
sandbox, err := newKubeClientSandbox(client, sharedInformerFactory,
|
||||
v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create kubeClientSandbox: %v", err)
|
||||
}
|
||||
|
||||
eviction1 := &policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: p1.Name,
|
||||
Namespace: p1.Namespace,
|
||||
},
|
||||
}
|
||||
eviction2 := &policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: p2.Name,
|
||||
Namespace: p2.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
sandbox.fakeSharedInformerFactory().Start(ctx.Done())
|
||||
sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
|
||||
|
||||
if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil {
|
||||
t.Fatalf("Error evicting p1: %v", err)
|
||||
}
|
||||
if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil {
|
||||
t.Fatalf("Error evicting p2: %v", err)
|
||||
}
|
||||
|
||||
evictedPods := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPods) != 2 {
|
||||
t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods))
|
||||
}
|
||||
t.Logf("Evicted pods in cache before reset: %d", len(evictedPods))
|
||||
|
||||
for _, evictedPod := range evictedPods {
|
||||
if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" {
|
||||
t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
|
||||
}
|
||||
t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
|
||||
}
|
||||
|
||||
sandbox.reset()
|
||||
|
||||
evictedPodsAfterReset := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPodsAfterReset) != 0 {
|
||||
t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset))
|
||||
}
|
||||
t.Logf("Successfully verified cache is empty after reset")
|
||||
}
|
||||
|
||||
func TestEvictedPodsCache(t *testing.T) {
|
||||
t.Run("add single pod", func(t *testing.T) {
|
||||
const (
|
||||
podName = "pod1"
|
||||
podNamespace = "default"
|
||||
podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
)
|
||||
cache := newEvictedPodsCache()
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
UID: podUID,
|
||||
},
|
||||
}
|
||||
|
||||
cache.add(pod)
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod in cache, got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID {
|
||||
t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add multiple pods", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
pods := []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}},
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
cache.add(pod)
|
||||
}
|
||||
|
||||
cachedPods := cache.list()
|
||||
if len(cachedPods) != 3 {
|
||||
t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods))
|
||||
}
|
||||
|
||||
podMap := make(map[string]*evictedPodInfo)
|
||||
for _, cachedPod := range cachedPods {
|
||||
podMap[cachedPod.UID] = cachedPod
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
cached, ok := podMap[string(pod.UID)]
|
||||
if !ok {
|
||||
t.Errorf("Pod with UID %s not found in cache", pod.UID)
|
||||
continue
|
||||
}
|
||||
if cached.Name != pod.Name || cached.Namespace != pod.Namespace {
|
||||
t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add duplicate pod updates entry", func(t *testing.T) {
|
||||
const (
|
||||
duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
updatedPodName = "pod1-new"
|
||||
updatedPodNS = "kube-system"
|
||||
)
|
||||
cache := newEvictedPodsCache()
|
||||
pod1 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
Namespace: "default",
|
||||
UID: duplicateUID,
|
||||
},
|
||||
}
|
||||
pod2 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: updatedPodName,
|
||||
Namespace: updatedPodNS,
|
||||
UID: duplicateUID,
|
||||
},
|
||||
}
|
||||
|
||||
cache.add(pod1)
|
||||
cache.add(pod2)
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS {
|
||||
t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("list returns empty array for empty cache", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
pods := cache.list()
|
||||
if pods == nil {
|
||||
t.Fatal("Expected non-nil slice from list()")
|
||||
}
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty list, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("list returns copies not references", func(t *testing.T) {
|
||||
const originalPodName = "pod1"
|
||||
cache := newEvictedPodsCache()
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: originalPodName,
|
||||
Namespace: "default",
|
||||
UID: "12345678-1234-1234-1234-123456789abc",
|
||||
},
|
||||
}
|
||||
cache.add(pod)
|
||||
|
||||
pods1 := cache.list()
|
||||
pods2 := cache.list()
|
||||
|
||||
if len(pods1) != 1 || len(pods2) != 1 {
|
||||
t.Fatalf("Expected 1 pod in both lists")
|
||||
}
|
||||
|
||||
pods1[0].Name = "modified"
|
||||
|
||||
if pods2[0].Name == "modified" {
|
||||
t.Error("Modifying list result should not affect other list results (should be copies)")
|
||||
}
|
||||
|
||||
pods3 := cache.list()
|
||||
if pods3[0].Name != originalPodName {
|
||||
t.Error("Cache data was modified, list() should return copies")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("clear empties the cache", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}})
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}})
|
||||
|
||||
if len(cache.list()) != 2 {
|
||||
t.Fatal("Expected 2 pods before clear")
|
||||
}
|
||||
|
||||
cache.clear()
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty cache after clear, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("clear on empty cache is safe", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.clear()
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty cache, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add after clear works correctly", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}})
|
||||
cache.clear()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}})
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != "pod2" {
|
||||
t.Errorf("Expected pod2, got %s", pods[0].Name)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodEvictionReactionFncErrorHandling(t *testing.T) {
|
||||
podsGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
|
||||
|
||||
|
||||
401
pkg/descheduler/kubeclientsandbox.go
Normal file
401
pkg/descheduler/kubeclientsandbox.go
Normal file
@@ -0,0 +1,401 @@
|
||||
/*
|
||||
Copyright 2026 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package descheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
policyv1 "k8s.io/api/policy/v1"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode
|
||||
type evictedPodInfo struct {
|
||||
Namespace string
|
||||
Name string
|
||||
UID string
|
||||
}
|
||||
|
||||
// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode
|
||||
type evictedPodsCache struct {
|
||||
sync.RWMutex
|
||||
pods map[string]*evictedPodInfo
|
||||
}
|
||||
|
||||
func newEvictedPodsCache() *evictedPodsCache {
|
||||
return &evictedPodsCache{
|
||||
pods: make(map[string]*evictedPodInfo),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) add(pod *v1.Pod) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.pods[string(pod.UID)] = &evictedPodInfo{
|
||||
Namespace: pod.Namespace,
|
||||
Name: pod.Name,
|
||||
UID: string(pod.UID),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) list() []*evictedPodInfo {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
pods := make([]*evictedPodInfo, 0, len(c.pods))
|
||||
for _, pod := range c.pods {
|
||||
podCopy := *pod
|
||||
pods = append(pods, &podCopy)
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) clear() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.pods = make(map[string]*evictedPodInfo)
|
||||
}
|
||||
|
||||
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory
|
||||
// that mirrors resources from a real client, useful for dry-run testing scenarios
|
||||
type kubeClientSandbox struct {
|
||||
fakeKubeClient *fakeclientset.Clientset
|
||||
fakeFactory informers.SharedInformerFactory
|
||||
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
|
||||
evictedPodsCache *evictedPodsCache
|
||||
podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error)
|
||||
}
|
||||
|
||||
func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) {
|
||||
return newKubeClientSandbox(client, sharedInformerFactory,
|
||||
v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
v1.SchemeGroupVersion.WithResource("namespaces"),
|
||||
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"),
|
||||
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
|
||||
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"),
|
||||
)
|
||||
}
|
||||
|
||||
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
|
||||
sandbox := &kubeClientSandbox{
|
||||
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
|
||||
evictedPodsCache: newEvictedPodsCache(),
|
||||
podEvictionReactionFnc: podEvictionReactionFnc,
|
||||
}
|
||||
|
||||
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
|
||||
// simulate a pod eviction by deleting a pod
|
||||
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
|
||||
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
|
||||
|
||||
for _, resource := range resources {
|
||||
informer, err := sharedInformerFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sandbox.resourceToInformer[resource] = informer
|
||||
}
|
||||
|
||||
// Register event handlers to sync changes from real client to fake client.
|
||||
// These handlers will keep the fake client in sync with ongoing changes.
|
||||
if err := sandbox.registerEventHandlers(); err != nil {
|
||||
return nil, fmt.Errorf("error registering event handlers: %w", err)
|
||||
}
|
||||
|
||||
return sandbox, nil
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) registerEventHandlers() error {
|
||||
for resource, informer := range sandbox.resourceToInformer {
|
||||
// Create a local copy to avoid closure capture issue
|
||||
resource := resource
|
||||
|
||||
_, err := sandbox.fakeFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err)
|
||||
}
|
||||
|
||||
_, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
klog.ErrorS(err, "failed to add object to fake client", "resource", resource)
|
||||
}
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
runtimeObj, ok := newObj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
metaObj, err := meta.Accessor(runtimeObj)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil {
|
||||
klog.ErrorS(err, "failed to update object in fake client", "resource", resource)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// Handle tombstone case where the object might be wrapped
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = tombstone.Obj
|
||||
}
|
||||
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
metaObj, err := meta.Accessor(runtimeObj)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil {
|
||||
klog.ErrorS(err, "failed to delete object from fake client", "resource", resource)
|
||||
}
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error adding event handler for resource %s: %w", resource, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset {
|
||||
return sandbox.fakeKubeClient
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory {
|
||||
return sandbox.fakeFactory
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) reset() {
|
||||
sandbox.evictedPodsCache.clear()
|
||||
}
|
||||
|
||||
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
|
||||
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name string) (bool, error) {
|
||||
informer, err := sandbox.fakeFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
|
||||
}
|
||||
|
||||
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
|
||||
_, exists, err := informer.Informer().GetIndexer().GetByKey(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
|
||||
func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) {
|
||||
// Get metadata accessor to extract namespace and name
|
||||
metaObj, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get object metadata: %w", err)
|
||||
}
|
||||
|
||||
// Get the GVK from the object using TypeMeta
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
if gvk.Empty() {
|
||||
return false, fmt.Errorf("no GroupVersionKind found for object")
|
||||
}
|
||||
|
||||
// Use the GVK to construct the GVR by pluralizing the kind
|
||||
plural, _ := meta.UnsafeGuessKindToResource(gvk)
|
||||
gvr := schema.GroupVersionResource{
|
||||
Group: gvk.Group,
|
||||
Version: gvk.Version,
|
||||
Resource: plural.Resource,
|
||||
}
|
||||
|
||||
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName())
|
||||
}
|
||||
|
||||
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
|
||||
if len(pods) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
for _, pod := range pods {
|
||||
satisfied, err := checkFn(pod)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !satisfied {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(4).InfoS(successMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreEvictedPods restores pods from the evicted pods cache back to the fake client
|
||||
func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error {
|
||||
podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")]
|
||||
if !ok {
|
||||
return fmt.Errorf("pod informer not found in resourceToInformer")
|
||||
}
|
||||
|
||||
evictedPods := sandbox.evictedPodsCache.list()
|
||||
|
||||
// First wait loop: Check that all evicted pods are cleared from the indexers.
|
||||
// This ensures the eviction has fully propagated through the fake informer's indexer.
|
||||
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
|
||||
return false, nil
|
||||
}
|
||||
if exists {
|
||||
klog.V(4).InfoS("Pod still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Pod no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return true, nil
|
||||
}, "All evicted pods removed from fake indexer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
|
||||
}
|
||||
|
||||
var restoredPods []*evictedPodInfo
|
||||
for _, evictedPodInfo := range sandbox.evictedPodsCache.list() {
|
||||
obj, err := podInformer.Lister().ByNamespace(evictedPodInfo.Namespace).Get(evictedPodInfo.Name)
|
||||
if err != nil {
|
||||
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Object is not a pod", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
if string(pod.UID) != evictedPodInfo.UID {
|
||||
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "expectedUID", evictedPodInfo.UID, "actualUID", string(pod.UID))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", evictedPodInfo.Namespace, evictedPodInfo.Name, err)
|
||||
}
|
||||
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "uid", evictedPodInfo.UID)
|
||||
restoredPods = append(restoredPods, evictedPodInfo)
|
||||
}
|
||||
|
||||
// Second wait loop: Make sure the evicted pods are added back to the fake client.
|
||||
// This ensures the restored pods are accessible through the fake informer's lister.
|
||||
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
|
||||
return true, nil
|
||||
}, "All restored pods are accessible in fake informer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err)
|
||||
}
|
||||
|
||||
// Third wait loop: Make sure the indexers see the added pods.
|
||||
// This is important to ensure each descheduling cycle can see all the restored pods.
|
||||
// Without this wait, the next cycle might not see the restored pods in the indexer yet.
|
||||
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
|
||||
return false, nil
|
||||
}
|
||||
if !exists {
|
||||
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return true, nil
|
||||
}, "All restored pods are now in fake indexer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return func(action core.Action) (bool, runtime.Object, error) {
|
||||
if action.GetSubresource() == "eviction" {
|
||||
createAct, matched := action.(core.CreateActionImpl)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
|
||||
}
|
||||
eviction, matched := createAct.Object.(*policy.Eviction)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
|
||||
}
|
||||
podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName())
|
||||
if err == nil {
|
||||
if pod, ok := podObj.(*v1.Pod); ok {
|
||||
evictedCache.add(pod)
|
||||
} else {
|
||||
return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName())
|
||||
}
|
||||
} else if !apierrors.IsNotFound(err) {
|
||||
return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
||||
}
|
||||
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
|
||||
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
||||
}
|
||||
return true, nil, nil
|
||||
}
|
||||
// fallback to the default reactor
|
||||
return false, nil, nil
|
||||
}
|
||||
}
|
||||
281
pkg/descheduler/kubeclientsandbox_test.go
Normal file
281
pkg/descheduler/kubeclientsandbox_test.go
Normal file
@@ -0,0 +1,281 @@
|
||||
/*
|
||||
Copyright 2026 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package descheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
"sigs.k8s.io/descheduler/test"
|
||||
)
|
||||
|
||||
func TestKubeClientSandboxReset(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
|
||||
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
|
||||
client := fakeclientset.NewSimpleClientset(node1, p1, p2)
|
||||
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0)
|
||||
|
||||
// Explicitly get the informers to ensure they're registered
|
||||
_ = sharedInformerFactory.Core().V1().Pods().Informer()
|
||||
_ = sharedInformerFactory.Core().V1().Nodes().Informer()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
sandbox, err := newKubeClientSandbox(client, sharedInformerFactory,
|
||||
v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create kubeClientSandbox: %v", err)
|
||||
}
|
||||
|
||||
eviction1 := &policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: p1.Name,
|
||||
Namespace: p1.Namespace,
|
||||
},
|
||||
}
|
||||
eviction2 := &policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: p2.Name,
|
||||
Namespace: p2.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
sandbox.fakeSharedInformerFactory().Start(ctx.Done())
|
||||
sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
|
||||
|
||||
if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil {
|
||||
t.Fatalf("Error evicting p1: %v", err)
|
||||
}
|
||||
if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil {
|
||||
t.Fatalf("Error evicting p2: %v", err)
|
||||
}
|
||||
|
||||
evictedPods := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPods) != 2 {
|
||||
t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods))
|
||||
}
|
||||
t.Logf("Evicted pods in cache before reset: %d", len(evictedPods))
|
||||
|
||||
for _, evictedPod := range evictedPods {
|
||||
if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" {
|
||||
t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
|
||||
}
|
||||
t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
|
||||
}
|
||||
|
||||
sandbox.reset()
|
||||
|
||||
evictedPodsAfterReset := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPodsAfterReset) != 0 {
|
||||
t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset))
|
||||
}
|
||||
t.Logf("Successfully verified cache is empty after reset")
|
||||
}
|
||||
|
||||
func TestEvictedPodsCache(t *testing.T) {
|
||||
t.Run("add single pod", func(t *testing.T) {
|
||||
const (
|
||||
podName = "pod1"
|
||||
podNamespace = "default"
|
||||
podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
)
|
||||
cache := newEvictedPodsCache()
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
UID: podUID,
|
||||
},
|
||||
}
|
||||
|
||||
cache.add(pod)
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod in cache, got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID {
|
||||
t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add multiple pods", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
pods := []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}},
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
cache.add(pod)
|
||||
}
|
||||
|
||||
cachedPods := cache.list()
|
||||
if len(cachedPods) != 3 {
|
||||
t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods))
|
||||
}
|
||||
|
||||
podMap := make(map[string]*evictedPodInfo)
|
||||
for _, cachedPod := range cachedPods {
|
||||
podMap[cachedPod.UID] = cachedPod
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
cached, ok := podMap[string(pod.UID)]
|
||||
if !ok {
|
||||
t.Errorf("Pod with UID %s not found in cache", pod.UID)
|
||||
continue
|
||||
}
|
||||
if cached.Name != pod.Name || cached.Namespace != pod.Namespace {
|
||||
t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add duplicate pod updates entry", func(t *testing.T) {
|
||||
const (
|
||||
duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
updatedPodName = "pod1-new"
|
||||
updatedPodNS = "kube-system"
|
||||
)
|
||||
cache := newEvictedPodsCache()
|
||||
pod1 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
Namespace: "default",
|
||||
UID: duplicateUID,
|
||||
},
|
||||
}
|
||||
pod2 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: updatedPodName,
|
||||
Namespace: updatedPodNS,
|
||||
UID: duplicateUID,
|
||||
},
|
||||
}
|
||||
|
||||
cache.add(pod1)
|
||||
cache.add(pod2)
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS {
|
||||
t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("list returns empty array for empty cache", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
pods := cache.list()
|
||||
if pods == nil {
|
||||
t.Fatal("Expected non-nil slice from list()")
|
||||
}
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty list, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("list returns copies not references", func(t *testing.T) {
|
||||
const originalPodName = "pod1"
|
||||
cache := newEvictedPodsCache()
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: originalPodName,
|
||||
Namespace: "default",
|
||||
UID: "12345678-1234-1234-1234-123456789abc",
|
||||
},
|
||||
}
|
||||
cache.add(pod)
|
||||
|
||||
pods1 := cache.list()
|
||||
pods2 := cache.list()
|
||||
|
||||
if len(pods1) != 1 || len(pods2) != 1 {
|
||||
t.Fatalf("Expected 1 pod in both lists")
|
||||
}
|
||||
|
||||
pods1[0].Name = "modified"
|
||||
|
||||
if pods2[0].Name == "modified" {
|
||||
t.Error("Modifying list result should not affect other list results (should be copies)")
|
||||
}
|
||||
|
||||
pods3 := cache.list()
|
||||
if pods3[0].Name != originalPodName {
|
||||
t.Error("Cache data was modified, list() should return copies")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("clear empties the cache", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}})
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}})
|
||||
|
||||
if len(cache.list()) != 2 {
|
||||
t.Fatal("Expected 2 pods before clear")
|
||||
}
|
||||
|
||||
cache.clear()
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty cache after clear, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("clear on empty cache is safe", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.clear()
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty cache, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add after clear works correctly", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}})
|
||||
cache.clear()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}})
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != "pod2" {
|
||||
t.Errorf("Expected pod2, got %s", pods[0].Name)
|
||||
}
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user