1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-25 20:59:28 +01:00

refactor(pkg/descheduler): create fake shared informer factory only once

This commit is contained in:
Jan Chaloupka
2026-01-22 17:21:13 +01:00
parent 38d99dd0c3
commit 770ec5affa
4 changed files with 504 additions and 131 deletions

View File

@@ -36,6 +36,7 @@ import (
schedulingv1 "k8s.io/api/scheduling/v1" schedulingv1 "k8s.io/api/scheduling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@@ -133,6 +134,7 @@ func (c *evictedPodsCache) clear() {
type descheduler struct { type descheduler struct {
rs *options.DeschedulerServer rs *options.DeschedulerServer
client clientset.Interface
kubeClientSandbox *kubeClientSandbox kubeClientSandbox *kubeClientSandbox
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory sharedInformerFactory informers.SharedInformerFactory
@@ -151,8 +153,6 @@ type descheduler struct {
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory // kubeClientSandbox creates a sandbox environment with a fake client and informer factory
// that mirrors resources from a real client, useful for dry-run testing scenarios // that mirrors resources from a real client, useful for dry-run testing scenarios
type kubeClientSandbox struct { type kubeClientSandbox struct {
client clientset.Interface
sharedInformerFactory informers.SharedInformerFactory
fakeKubeClient *fakeclientset.Clientset fakeKubeClient *fakeclientset.Clientset
fakeFactory informers.SharedInformerFactory fakeFactory informers.SharedInformerFactory
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
@@ -173,13 +173,16 @@ func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFacto
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) { func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
sandbox := &kubeClientSandbox{ sandbox := &kubeClientSandbox{
client: client,
sharedInformerFactory: sharedInformerFactory,
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer), resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
evictedPodsCache: newEvictedPodsCache(), evictedPodsCache: newEvictedPodsCache(),
podEvictionReactionFnc: podEvictionReactionFnc, podEvictionReactionFnc: podEvictionReactionFnc,
} }
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
for _, resource := range resources { for _, resource := range resources {
informer, err := sharedInformerFactory.ForResource(resource) informer, err := sharedInformerFactory.ForResource(resource)
if err != nil { if err != nil {
@@ -188,33 +191,78 @@ func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory info
sandbox.resourceToInformer[resource] = informer sandbox.resourceToInformer[resource] = informer
} }
// Register event handlers to sync changes from real client to fake client.
// These handlers will keep the fake client in sync with ongoing changes.
if err := sandbox.registerEventHandlers(); err != nil {
return nil, fmt.Errorf("error registering event handlers: %w", err)
}
return sandbox, nil return sandbox, nil
} }
func (sandbox *kubeClientSandbox) buildSandbox() error { func (sandbox *kubeClientSandbox) registerEventHandlers() error {
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
for resource, informer := range sandbox.resourceToInformer { for resource, informer := range sandbox.resourceToInformer {
// Create a local copy to avoid closure capture issue
resource := resource
_, err := sandbox.fakeFactory.ForResource(resource) _, err := sandbox.fakeFactory.ForResource(resource)
if err != nil { if err != nil {
return fmt.Errorf("error getting resource %s: %w", resource, err) return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err)
} }
objects, err := informer.Lister().List(labels.Everything()) _, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.ErrorS(err, "failed to add object to fake client", "resource", resource)
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
runtimeObj, ok := newObj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil {
klog.ErrorS(err, "failed to update object in fake client", "resource", resource)
}
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone case where the object might be wrapped
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil {
klog.ErrorS(err, "failed to delete object from fake client", "resource", resource)
}
},
})
if err != nil { if err != nil {
return fmt.Errorf("error listing %s: %w", informer, err) return fmt.Errorf("error adding event handler for resource %s: %w", resource, err)
}
for _, object := range objects {
if err := sandbox.fakeKubeClient.Tracker().Add(object); err != nil {
return fmt.Errorf("error adding object to tracker: %w", err)
}
} }
} }
return nil return nil
} }
@@ -230,6 +278,161 @@ func (sandbox *kubeClientSandbox) reset() {
sandbox.evictedPodsCache.clear() sandbox.evictedPodsCache.clear()
} }
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name string) (bool, error) {
informer, err := sandbox.fakeFactory.ForResource(resource)
if err != nil {
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
}
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
_, exists, err := informer.Informer().GetIndexer().GetByKey(key)
if err != nil {
return false, err
}
return exists, nil
}
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) {
// Get metadata accessor to extract namespace and name
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
// Get the GVK from the object using TypeMeta
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.Empty() {
return false, fmt.Errorf("no GroupVersionKind found for object")
}
// Use the GVK to construct the GVR by pluralizing the kind
plural, _ := meta.UnsafeGuessKindToResource(gvk)
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: plural.Resource,
}
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName())
}
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
if len(pods) == 0 {
return nil
}
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
for _, pod := range pods {
satisfied, err := checkFn(pod)
if err != nil {
return false, err
}
if !satisfied {
return false, nil
}
}
return true, nil
})
if err != nil {
return err
}
klog.V(4).InfoS(successMsg)
return nil
}
// restoreEvictedPods restores pods from the evicted pods cache back to the fake client
func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error {
podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")]
if !ok {
return fmt.Errorf("pod informer not found in resourceToInformer")
}
evictedPods := sandbox.evictedPodsCache.list()
// First wait loop: Check that all evicted pods are cleared from the indexers.
// This ensures the eviction has fully propagated through the fake informer's indexer.
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
if err != nil {
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if exists {
klog.V(4).InfoS("Pod still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
return false, nil
}
klog.V(4).InfoS("Pod no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}, "All evicted pods removed from fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
}
var restoredPods []*evictedPodInfo
for _, evictedPodInfo := range sandbox.evictedPodsCache.list() {
obj, err := podInformer.Lister().ByNamespace(evictedPodInfo.Namespace).Get(evictedPodInfo.Name)
if err != nil {
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "error", err)
continue
}
pod, ok := obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Object is not a pod", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name)
continue
}
if string(pod.UID) != evictedPodInfo.UID {
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "expectedUID", evictedPodInfo.UID, "actualUID", string(pod.UID))
continue
}
if err := sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", evictedPodInfo.Namespace, evictedPodInfo.Name, err)
}
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", evictedPodInfo.Namespace, "name", evictedPodInfo.Name, "uid", evictedPodInfo.UID)
restoredPods = append(restoredPods, evictedPodInfo)
}
// Second wait loop: Make sure the evicted pods are added back to the fake client.
// This ensures the restored pods are accessible through the fake informer's lister.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
return false, nil
}
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
return true, nil
}, "All restored pods are accessible in fake informer"); err != nil {
return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err)
}
// Third wait loop: Make sure the indexers see the added pods.
// This is important to ensure each descheduling cycle can see all the restored pods.
// Without this wait, the next cycle might not see the restored pods in the indexer yet.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name)
if err != nil {
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if !exists {
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
return false, nil
}
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}, "All restored pods are now in fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err)
}
return nil
}
func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) { func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) {
nodeSelector := labels.Everything() nodeSelector := labels.Everything()
if deschedulerPolicy.NodeSelector != nil { if deschedulerPolicy.NodeSelector != nil {
@@ -246,30 +449,6 @@ func addNodeSelectorIndexer(sharedInformerFactory informers.SharedInformerFactor
return nodeutil.AddNodeSelectorIndexer(sharedInformerFactory.Core().V1().Nodes().Informer(), indexerNodeSelectorGlobal, nodeSelector) return nodeutil.AddNodeSelectorIndexer(sharedInformerFactory.Core().V1().Nodes().Informer(), indexerNodeSelectorGlobal, nodeSelector)
} }
func setupInformerIndexers(sharedInformerFactory informers.SharedInformerFactory, deschedulerPolicy *api.DeschedulerPolicy) (podutil.GetPodsAssignedToNodeFunc, error) {
// create a new instance of the shared informer factory from the cached client
// register the pod informer, otherwise it will not get running
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(sharedInformerFactory.Core().V1().Pods().Informer())
if err != nil {
return nil, fmt.Errorf("build get pods assigned to node function error: %v", err)
}
// TODO(ingvagabund): copy paste all relevant indexers from the real client to the fake one
// TODO(ingvagabund): register one indexer per each profile. Respect the precedence of no profile-level node selector is specified.
// Also, keep a cache of node label selectors to detect duplicates to avoid creating an extra informer.
nodeSelector, err := nodeSelectorFromPolicy(deschedulerPolicy)
if err != nil {
return nil, err
}
if err := addNodeSelectorIndexer(sharedInformerFactory, nodeSelector); err != nil {
return nil, err
}
return getPodsAssignedToNode, nil
}
func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider { func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider {
providersMap := make(map[api.MetricsSource]*api.MetricsProvider) providersMap := make(map[api.MetricsSource]*api.MetricsProvider)
for _, provider := range providersList { for _, provider := range providersList {
@@ -295,15 +474,11 @@ func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory inf
return nil return nil
} }
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, kubeClientSandbox *kubeClientSandbox) (*descheduler, error) {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer() podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// Temporarily register the PVC because it is used by the DefaultEvictor plugin during
// Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay // the descheduling cycle, where informer registration is ignored.
// consistent with the real runs without having to keep the list here in sync. _ = sharedInformerFactory.Core().V1().PersistentVolumeClaims().Informer()
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
return nil, fmt.Errorf("failed to create kube client sandbox: %v", err)
}
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil { if err != nil {
@@ -312,7 +487,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
podEvictor, err := evictions.NewPodEvictor( podEvictor, err := evictions.NewPodEvictor(
ctx, ctx,
rs.Client, client,
eventRecorder, eventRecorder,
podInformer, podInformer,
rs.DefaultFeatureGates, rs.DefaultFeatureGates,
@@ -332,6 +507,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
desch := &descheduler{ desch := &descheduler{
rs: rs, rs: rs,
client: client,
kubeClientSandbox: kubeClientSandbox, kubeClientSandbox: kubeClientSandbox,
getPodsAssignedToNode: getPodsAssignedToNode, getPodsAssignedToNode: getPodsAssignedToNode,
sharedInformerFactory: sharedInformerFactory, sharedInformerFactory: sharedInformerFactory,
@@ -479,42 +655,20 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context) error {
metrics.LoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds()) metrics.LoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
}(time.Now()) }(time.Now())
var client clientset.Interface klog.V(3).Infof("Resetting pod evictor counters")
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
// So when evicting pods while running multiple strategies in a row have the cummulative effect
// as is when evicting pods for real.
if d.rs.DryRun {
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// Create a new cache so we start from scratch without any leftovers
err := d.kubeClientSandbox.buildSandbox()
if err != nil {
return err
}
getPodsAssignedToNode, err := setupInformerIndexers(d.kubeClientSandbox.fakeSharedInformerFactory(), d.deschedulerPolicy)
if err != nil {
return err
}
d.getPodsAssignedToNode = getPodsAssignedToNode
fakeCtx, cncl := context.WithCancel(context.TODO())
defer cncl()
d.kubeClientSandbox.fakeSharedInformerFactory().Start(fakeCtx.Done())
d.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(fakeCtx.Done())
client = d.kubeClientSandbox.fakeClient()
d.sharedInformerFactory = d.kubeClientSandbox.fakeSharedInformerFactory()
} else {
client = d.rs.Client
}
klog.V(3).Infof("Setting up the pod evictor")
d.podEvictor.SetClient(client)
d.podEvictor.ResetCounters() d.podEvictor.ResetCounters()
d.runProfiles(ctx, client) d.runProfiles(ctx, d.client)
if d.rs.DryRun { if d.rs.DryRun {
if d.kubeClientSandbox == nil {
return fmt.Errorf("kubeClientSandbox is nil in DryRun mode")
}
klog.V(3).Infof("Restoring evicted pods from cache")
if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
klog.ErrorS(err, "Failed to restore evicted pods")
return fmt.Errorf("failed to restore evicted pods: %w", err)
}
d.kubeClientSandbox.reset() d.kubeClientSandbox.reset()
} }
@@ -762,7 +916,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
} }
} }
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory, namespacedSharedInformerFactory) // Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, rs.Client, sharedInformerFactory, nil)
if err != nil { if err != nil {
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error()))) span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err return err
@@ -776,17 +931,46 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
return err return err
} }
// If in dry run mode, replace the descheduler with one using fake client/factory
if rs.DryRun {
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
span.AddEvent("Failed to create kube client sandbox", trace.WithAttributes(attribute.String("err", err.Error())))
return fmt.Errorf("failed to create kube client sandbox: %v", err)
}
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// TODO(ingvagabund): drop the previous queue
// TODO(ingvagabund): stop the previous pod evictor
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
span.AddEvent("Failed to create dry run descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
}
// In dry run mode, start and sync the fake shared informer factory so it can mirror
// events from the real factory. Reliable propagation depends on both factories being
// fully synced (see WaitForCacheSync calls below), not solely on startup order.
if rs.DryRun {
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
}
sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.Start(ctx.Done())
if metricProviderTokenReconciliation == secretReconciliation { if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.Start(ctx.Done()) namespacedSharedInformerFactory.Start(ctx.Done())
} }
sharedInformerFactory.WaitForCacheSync(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done())
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if metricProviderTokenReconciliation == secretReconciliation { if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done()) namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done())
} }
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if descheduler.metricsCollector != nil { if descheduler.metricsCollector != nil {
go func() { go func() {
klog.V(2).Infof("Starting metrics collector") klog.V(2).Infof("Starting metrics collector")

View File

@@ -13,15 +13,18 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1" policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
apiversion "k8s.io/apimachinery/pkg/version" apiversion "k8s.io/apimachinery/pkg/version"
fakediscovery "k8s.io/client-go/discovery/fake" fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake" fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/featuregate" "k8s.io/component-base/featuregate"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/metrics/v1beta1" "k8s.io/metrics/pkg/apis/metrics/v1beta1"
@@ -196,20 +199,68 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client)
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory, nil) // Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil)
if err != nil { if err != nil {
eventBroadcaster.Shutdown() eventBroadcaster.Shutdown()
t.Fatalf("Unable to create a descheduler instance: %v", err) t.Fatalf("Unable to create descheduler instance: %v", err)
} }
// Setup Prometheus provider (only for real client case, not for dry run)
if err := setupPrometheusProvider(descheduler, nil); err != nil { if err := setupPrometheusProvider(descheduler, nil); err != nil {
eventBroadcaster.Shutdown() eventBroadcaster.Shutdown()
t.Fatalf("Failed to setup Prometheus provider: %v", err) t.Fatalf("Failed to setup Prometheus provider: %v", err)
} }
// If in dry run mode, replace the descheduler with one using fake client/factory
if dryRun {
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Failed to create kube client sandbox: %v", err)
}
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Unable to create dry run descheduler instance: %v", err)
}
}
// Start the real shared informer factory after creating the descheduler
if dryRun {
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
}
sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done())
if dryRun {
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
for _, obj := range objects {
exists, err := descheduler.kubeClientSandbox.hasRuntimeObjectInIndexer(obj)
if err != nil {
return false, err
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
key := cache.MetaObjectToName(metaObj).String()
if !exists {
klog.Infof("Object %q has not propagated to the indexer", key)
return false, nil
}
klog.Infof("Object %q has propagated to the indexer", key)
}
return true, nil
}); err != nil {
t.Fatalf("nodes did not propagate to the indexer: %v", err)
}
}
return rs, descheduler, client return rs, descheduler, client
} }
@@ -550,36 +601,12 @@ func TestPodEvictorReset(t *testing.T) {
} }
} }
// runDeschedulerLoopAndGetEvictedPods is a temporary duplication from runDeschedulerLoop // runDeschedulerLoopAndGetEvictedPods runs a descheduling cycle and returns the names of evicted pods.
// that will be removed after kubeClientSandbox gets migrated to event handlers. // This is similar to runDeschedulerLoop but captures evicted pod names before the cache is reset.
func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string { func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string {
var clientSet clientset.Interface
if dryRun {
if err := d.kubeClientSandbox.buildSandbox(); err != nil {
t.Fatalf("Failed to build sandbox: %v", err)
}
getPodsAssignedToNode, err := setupInformerIndexers(d.kubeClientSandbox.fakeSharedInformerFactory(), d.deschedulerPolicy)
if err != nil {
t.Fatalf("Failed to setup indexers: %v", err)
}
d.getPodsAssignedToNode = getPodsAssignedToNode
fakeCtx, cncl := context.WithCancel(context.TODO())
defer cncl()
d.kubeClientSandbox.fakeSharedInformerFactory().Start(fakeCtx.Done())
d.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(fakeCtx.Done())
clientSet = d.kubeClientSandbox.fakeClient()
d.sharedInformerFactory = d.kubeClientSandbox.fakeSharedInformerFactory()
} else {
clientSet = d.rs.Client
}
d.podEvictor.SetClient(clientSet)
d.podEvictor.ResetCounters() d.podEvictor.ResetCounters()
d.runProfiles(ctx, clientSet) d.runProfiles(ctx, d.client)
var evictedPodNames []string var evictedPodNames []string
if dryRun { if dryRun {
@@ -587,6 +614,10 @@ func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *d
for _, pod := range evictedPodsFromCache { for _, pod := range evictedPodsFromCache {
evictedPodNames = append(evictedPodNames, pod.Name) evictedPodNames = append(evictedPodNames, pod.Name)
} }
if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("Failed to restore evicted pods: %v", err)
}
d.kubeClientSandbox.reset() d.kubeClientSandbox.reset()
} }
@@ -962,6 +993,8 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) {
p3 := test.BuildTestPod("p3", 200, 0, node3.Name, updatePod) p3 := test.BuildTestPod("p3", 200, 0, node3.Name, updatePod)
p4 := test.BuildTestPod("p4", 200, 0, node4.Name, updatePod) p4 := test.BuildTestPod("p4", 200, 0, node4.Name, updatePod)
objects := []runtime.Object{node1, node2, node3, node4, p1, p2, p3, p4}
// Map pod names to their node names for validation // Map pod names to their node names for validation
podToNode := map[string]string{ podToNode := map[string]string{
"p1": "n1", "p1": "n1",
@@ -976,7 +1009,7 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) {
} }
ctxCancel, cancel := context.WithCancel(ctx) ctxCancel, cancel := context.WithCancel(ctx)
_, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, node1, node2, node3, node4, p1, p2, p3, p4) _, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, objects...)
defer cancel() defer cancel()
// Verify all pods are created initially // Verify all pods are created initially
@@ -1008,13 +1041,13 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) {
// Verify the correct number of nodes had pods evicted // Verify the correct number of nodes had pods evicted
if len(nodesWithEvictedPods) != len(tc.expectedEvictedFromNodes) { if len(nodesWithEvictedPods) != len(tc.expectedEvictedFromNodes) {
t.Errorf("Expected pods to be evicted from %d nodes, got %d nodes: %v", len(tc.expectedEvictedFromNodes), len(nodesWithEvictedPods), nodesWithEvictedPods) t.Fatalf("Expected pods to be evicted from %d nodes, got %d nodes: %v", len(tc.expectedEvictedFromNodes), len(nodesWithEvictedPods), nodesWithEvictedPods)
} }
// Verify pods were evicted from the correct nodes // Verify pods were evicted from the correct nodes
for _, nodeName := range tc.expectedEvictedFromNodes { for _, nodeName := range tc.expectedEvictedFromNodes {
if !nodesWithEvictedPods[nodeName] { if !nodesWithEvictedPods[nodeName] {
t.Errorf("Expected pod to be evicted from node %s, but it was not", nodeName) t.Fatalf("Expected pod to be evicted from node %s, but it was not", nodeName)
} }
} }
@@ -1028,7 +1061,7 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) {
} }
} }
if !found { if !found {
t.Errorf("Unexpected eviction from node %s", nodeName) t.Fatalf("Unexpected eviction from node %s", nodeName)
} }
} }
@@ -1141,10 +1174,6 @@ func TestKubeClientSandboxReset(t *testing.T) {
t.Fatalf("Failed to create kubeClientSandbox: %v", err) t.Fatalf("Failed to create kubeClientSandbox: %v", err)
} }
if err := sandbox.buildSandbox(); err != nil {
t.Fatalf("Failed to build sandbox: %v", err)
}
eviction1 := &policy.Eviction{ eviction1 := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: p1.Name, Name: p1.Name,
@@ -1158,6 +1187,9 @@ func TestKubeClientSandboxReset(t *testing.T) {
}, },
} }
sandbox.fakeSharedInformerFactory().Start(ctx.Done())
sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil { if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil {
t.Fatalf("Error evicting p1: %v", err) t.Fatalf("Error evicting p1: %v", err)
} }
@@ -1486,3 +1518,142 @@ func TestPodEvictionReactionFncErrorHandling(t *testing.T) {
}) })
} }
} }
// verifyPodIdentityFields checks if name, namespace, and UID match expected values
func verifyPodIdentityFields(t *testing.T, name, namespace, uid, expectedName, expectedNamespace, expectedUID, context string) {
t.Helper()
if name != expectedName {
t.Fatalf("Expected pod name %s%s, got %s", expectedName, context, name)
}
if namespace != expectedNamespace {
t.Fatalf("Expected pod namespace %s%s, got %s", expectedNamespace, context, namespace)
}
if uid != expectedUID {
t.Fatalf("Expected pod UID %s%s, got %s", expectedUID, context, uid)
}
}
// verifyPodIdentity checks if a pod has the expected name, namespace, and UID
func verifyPodIdentity(t *testing.T, pod *v1.Pod, expectedName, expectedNamespace string, expectedUID types.UID) {
t.Helper()
verifyPodIdentityFields(t, pod.Name, pod.Namespace, string(pod.UID), expectedName, expectedNamespace, string(expectedUID), "")
}
func TestEvictedPodRestorationInDryRun(t *testing.T) {
// Initialize klog flags
// klog.InitFlags(nil)
// Set verbosity level (higher number = more verbose)
// 0 = errors only, 1-4 = info, 5-9 = debug, 10+ = trace
// flag.Set("v", "4")
initPluginRegistry()
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()
// Create descheduler with DryRun mode
client := fakeclientset.NewSimpleClientset(node1, node2, p1)
eventClient := fakeclientset.NewSimpleClientset(node1, node2, p1)
rs, err := options.NewDeschedulerServer()
if err != nil {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.EventClient = eventClient
rs.DefaultFeatureGates = initFeatureGates()
rs.DryRun = true // Set DryRun before creating descheduler
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctxCancel, client)
defer eventBroadcaster.Shutdown()
// Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil)
if err != nil {
t.Fatalf("Unable to create descheduler instance: %v", err)
}
sharedInformerFactory.Start(ctxCancel.Done())
sharedInformerFactory.WaitForCacheSync(ctxCancel.Done())
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
t.Fatalf("Failed to create kube client sandbox: %v", err)
}
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
t.Fatalf("Unable to create dry run descheduler instance: %v", err)
}
// Start and sync the fake factory after creating the descheduler
kubeClientSandbox.fakeSharedInformerFactory().Start(ctxCancel.Done())
kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctxCancel.Done())
// Verify the pod exists in the fake client after initialization
pod, err := kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected pod %s to exist in fake client after initialization, but got error: %v", p1.Name, err)
}
verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID)
klog.Infof("Pod %s exists in fake client after initialization", p1.Name)
// Run two descheduling cycles to verify pod eviction and restoration works repeatedly
for i := 1; i <= 2; i++ {
// Run descheduling cycle
klog.Infof("Running descheduling cycle %d", i)
descheduler.podEvictor.ResetCounters()
descheduler.runProfiles(ctx, descheduler.client)
// Verify the pod was evicted (should not exist in fake client anymore)
_, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
if err == nil {
t.Fatalf("Expected pod %s to be evicted from fake client in cycle %d, but it still exists", p1.Name, i)
}
if !apierrors.IsNotFound(err) {
t.Fatalf("Expected NotFound error for pod %s in cycle %d, got: %v", p1.Name, i, err)
}
klog.Infof("Pod %s was successfully evicted from fake client in cycle %d", p1.Name, i)
// Verify the pod was added to the evicted pods cache
evictedPods := descheduler.kubeClientSandbox.evictedPodsCache.list()
if len(evictedPods) != 1 {
t.Fatalf("Expected 1 pod in evicted cache in cycle %d, got %d", i, len(evictedPods))
}
verifyPodIdentityFields(t, evictedPods[0].Name, evictedPods[0].Namespace, evictedPods[0].UID, p1.Name, p1.Namespace, string(p1.UID), fmt.Sprintf(" in cycle %d", i))
klog.Infof("Pod %s was successfully added to evicted pods cache in cycle %d (UID: %s)", p1.Name, i, p1.UID)
// Restore evicted pods
klog.Infof("Restoring evicted pods from cache in cycle %d", i)
if err := descheduler.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("Failed to restore evicted pods in cycle %d: %v", i, err)
}
descheduler.kubeClientSandbox.evictedPodsCache.clear()
// Verify the pod was restored back to the fake client
pod, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected pod %s to be restored to fake client in cycle %d, but got error: %v", p1.Name, i, err)
}
verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID)
klog.Infof("Pod %s was successfully restored to fake client in cycle %d (UID: %s)", p1.Name, i, pod.UID)
// Verify cache was cleared after restoration
evictedPods = descheduler.kubeClientSandbox.evictedPodsCache.list()
if len(evictedPods) != 0 {
t.Fatalf("Expected evicted cache to be empty after restoration in cycle %d, got %d pods", i, len(evictedPods))
}
klog.Infof("Evicted pods cache was cleared after restoration in cycle %d", i)
}
}

View File

@@ -403,12 +403,6 @@ func (pe *PodEvictor) ResetCounters() {
pe.totalPodCount = 0 pe.totalPodCount = 0
} }
func (pe *PodEvictor) SetClient(client clientset.Interface) {
pe.mu.Lock()
defer pe.mu.Unlock()
pe.client = client
}
func (pe *PodEvictor) evictionRequestsTotal() uint { func (pe *PodEvictor) evictionRequestsTotal() uint {
if pe.featureGates.Enabled(features.EvictionsInBackground) { if pe.featureGates.Enabled(features.EvictionsInBackground) {
return pe.erCache.evictionRequestsTotal() return pe.erCache.evictionRequestsTotal()

View File

@@ -43,6 +43,10 @@ import (
// BuildTestPod creates a test pod with given parameters. // BuildTestPod creates a test pod with given parameters.
func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v1.Pod)) *v1.Pod { func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v1.Pod)) *v1.Pod {
pod := &v1.Pod{ pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: "default", Namespace: "default",
Name: name, Name: name,
@@ -76,6 +80,10 @@ func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v
func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget { func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget {
maxUnavailable := intstr.FromInt32(1) maxUnavailable := intstr.FromInt32(1)
pdb := &policyv1.PodDisruptionBudget{ pdb := &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
APIVersion: "policy/v1",
Kind: "PodDisruptionBudget",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: "default", Namespace: "default",
Name: name, Name: name,
@@ -94,6 +102,10 @@ func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget {
func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim { func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{ pvc := &v1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: "default", Namespace: "default",
Name: name, Name: name,
@@ -114,6 +126,10 @@ func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim {
// BuildPodMetrics creates a test podmetrics with given parameters. // BuildPodMetrics creates a test podmetrics with given parameters.
func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics { func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics {
return &v1beta1.PodMetrics{ return &v1beta1.PodMetrics{
TypeMeta: metav1.TypeMeta{
APIVersion: "metrics.k8s.io/v1beta1",
Kind: "PodMetrics",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
Namespace: "default", Namespace: "default",
@@ -171,6 +187,10 @@ func GetDaemonSetOwnerRefList() []metav1.OwnerReference {
// BuildTestNode creates a node with specified capacity. // BuildTestNode creates a node with specified capacity.
func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) *v1.Node { func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) *v1.Node {
node := &v1.Node{ node := &v1.Node{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Node",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
SelfLink: fmt.Sprintf("/api/v1/nodes/%s", name), SelfLink: fmt.Sprintf("/api/v1/nodes/%s", name),
@@ -201,6 +221,10 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node))
func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics { func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics {
return &v1beta1.NodeMetrics{ return &v1beta1.NodeMetrics{
TypeMeta: metav1.TypeMeta{
APIVersion: "metrics.k8s.io/v1beta1",
Kind: "NodeMetrics",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
}, },