1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

Compare commits

..

25 Commits

Author SHA1 Message Date
Kubernetes Prow Robot
0b4fd3544c Merge pull request #1816 from ingvagabund/prometheus-client-update-test
test(pkg/descheduler): test a prometheus client update propagates to a plugin profile handle
2026-01-26 03:01:08 +05:30
Jan Chaloupka
fe2fb603f4 test(pkg/descheduler): test a prometheus client update propagates to a plugin profile handle 2026-01-25 22:00:28 +01:00
Kubernetes Prow Robot
a500ff9c64 Merge pull request #1814 from ingvagabund/refactorings
fix(kubeClientSandbox): do not wait for pods in the fake indexers if they are already deleted
2026-01-24 19:48:13 +05:30
Jan Chaloupka
263db33052 fix(kubeClientSandbox): do not wait for pods in the fake indexers if they are already deleted 2026-01-24 14:49:38 +01:00
Jan Chaloupka
45dc5a20d3 test(kubeClientSandbox): more unit tests 2026-01-24 14:47:17 +01:00
Jan Chaloupka
f520856095 refactor(kubeClientSandbox): move the code under a separate file 2026-01-24 14:47:10 +01:00
Jan Chaloupka
e53b3d5dce refactor(pkg/descheduler): drop unsed clientset parameter from descheduler.runProfiles 2026-01-23 22:27:59 +01:00
Kubernetes Prow Robot
72d61286eb Merge pull request #1812 from ingvagabund/register-fake-factory-only-once
refactor(pkg/descheduler): create fake shared informer factory only once
2026-01-24 02:43:28 +05:30
Jan Chaloupka
770ec5affa refactor(pkg/descheduler): create fake shared informer factory only once 2026-01-23 21:34:09 +01:00
Kubernetes Prow Robot
38d99dd0c3 Merge pull request #1813 from ingvagabund/refactorings
refactor(pkg/descheduler): more handlers and dropping unused code
2026-01-23 21:27:30 +05:30
Jan Chaloupka
8f5a83279e refactor(pkg/descheduler): drop unused fakeEvictedPods variables in the unit tests 2026-01-23 15:31:57 +01:00
Jan Chaloupka
4daa7e2fbf refactor(pkg/descheduler): move prometheus setup under a helper
Prometheus is not used anywhere in the tests so there's no need to setup
it there.
2026-01-23 15:31:57 +01:00
Jan Chaloupka
433f0dbb8c refactor(pkg/descheduler): define a helper for newKubeClientSandbox with the default list of resources 2026-01-23 15:31:51 +01:00
Kubernetes Prow Robot
cc96a3ee7a Merge pull request #1811 from ingvagabund/kube-client-sandbox
refactor(pkg/operator): replace informerResource with a kubeClientSandbox
2026-01-22 19:59:28 +05:30
Jan Chaloupka
ff580a0eff refactor(kubeClientSandbox): keep a cache of evicted pods and allow to reset it at the end of each descheduling cycle 2026-01-22 14:49:47 +01:00
Jan Chaloupka
4af097a806 refactor(pkg/operator): create a helper for registering indexer in the dry run mode 2026-01-22 14:44:41 +01:00
Jan Chaloupka
b3f0184af8 refactor(kubeClientSandbox): helpers for creating a node selector and node selector indexer 2026-01-22 14:44:32 +01:00
Jan Chaloupka
881ead3ed2 refactor(kubeClientSandbox): set the create pods reactor in buildSandbox 2026-01-22 14:44:14 +01:00
Jan Chaloupka
fc6d0d1132 refactor(pkg/operator): replace informerResource with a kubeClientSandbox 2026-01-22 14:41:48 +01:00
Kubernetes Prow Robot
85b1d97dda Merge pull request #1810 from ingvagabund/refactorings
chore(pkg/descheduler): make TestPodEvictorReset table driven
2026-01-20 19:08:49 +05:30
Jan Chaloupka
b6aadc1643 chore(pkg/descheduler): make TestPodEvictorReset table driven 2026-01-20 12:51:58 +01:00
Kubernetes Prow Robot
c4ec31684f Merge pull request #1802 from ingvagabund/global-node-selector-as-indexer
feat: register a node indexer for the global node selector instead of listing nodes with the selector
2026-01-12 15:08:13 +05:30
Kubernetes Prow Robot
7d2c31cd39 Merge pull request #1808 from ingvagabund/profile-instance-id
feat(profile): inject a plugin instance ID to each built plugin
2026-01-09 15:33:43 +05:30
Jan Chaloupka
cf9edca33c feat(profile): inject a plugin instance ID to each built plugin 2026-01-06 12:26:35 +01:00
Jan Chaloupka
93a516a58a feat: register a node indexer for the global node selector instead of listing nodes with the selector
To avoid iterating through every node every time a list of nodes is
requested. This is a prerequisition work for introducing profile level
node selectors.
2025-12-19 23:25:24 +01:00
13 changed files with 2574 additions and 267 deletions

View File

@@ -30,14 +30,9 @@ import (
"go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
@@ -47,7 +42,6 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
@@ -74,6 +68,7 @@ import (
const (
prometheusAuthTokenSecretKey = "prometheusAuthToken"
workQueueKey = "key"
indexerNodeSelectorGlobal = "indexer_node_selector_global"
)
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
@@ -85,14 +80,14 @@ type profileRunner struct {
type descheduler struct {
rs *options.DeschedulerServer
ir *informerResources
client clientset.Interface
kubeClientSandbox *kubeClientSandbox
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
namespacedSecretsLister corev1listers.SecretNamespaceLister
deschedulerPolicy *api.DeschedulerPolicy
eventRecorder events.EventRecorder
podEvictor *evictions.PodEvictor
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
metricsCollector *metricscollector.MetricsCollector
prometheusClient promapi.Client
previousPrometheusClientTransport *http.Transport
@@ -101,48 +96,20 @@ type descheduler struct {
metricsProviders map[api.MetricsSource]*api.MetricsProvider
}
type informerResources struct {
sharedInformerFactory informers.SharedInformerFactory
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) {
nodeSelector := labels.Everything()
if deschedulerPolicy.NodeSelector != nil {
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
if err != nil {
return nil, err
}
nodeSelector = sel
}
return nodeSelector, nil
}
func newInformerResources(sharedInformerFactory informers.SharedInformerFactory) *informerResources {
return &informerResources{
sharedInformerFactory: sharedInformerFactory,
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
}
}
func (ir *informerResources) Uses(resources ...schema.GroupVersionResource) error {
for _, resource := range resources {
informer, err := ir.sharedInformerFactory.ForResource(resource)
if err != nil {
return err
}
ir.resourceToInformer[resource] = informer
}
return nil
}
// CopyTo Copy informer subscriptions to the new factory and objects to the fake client so that the backing caches are populated for when listers are used.
func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFactory informers.SharedInformerFactory) error {
for resource, informer := range ir.resourceToInformer {
_, err := newFactory.ForResource(resource)
if err != nil {
return fmt.Errorf("error getting resource %s: %w", resource, err)
}
objects, err := informer.Lister().List(labels.Everything())
if err != nil {
return fmt.Errorf("error listing %s: %w", informer, err)
}
for _, object := range objects {
fakeClient.Tracker().Add(object)
}
}
return nil
func addNodeSelectorIndexer(sharedInformerFactory informers.SharedInformerFactory, nodeSelector labels.Selector) error {
return nodeutil.AddNodeSelectorIndexer(sharedInformerFactory.Core().V1().Nodes().Informer(), indexerNodeSelectorGlobal, nodeSelector)
}
func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider {
@@ -153,19 +120,28 @@ func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.Metri
return providersMap
}
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// setupPrometheusProvider sets up the prometheus provider on the descheduler if configured
func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory informers.SharedInformerFactory) error {
prometheusProvider := d.metricsProviders[api.PrometheusMetrics]
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil {
authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference
if authTokenSecret == nil || authTokenSecret.Namespace == "" {
return fmt.Errorf("prometheus metrics source configuration is missing authentication token secret")
}
if namespacedSharedInformerFactory == nil {
return fmt.Errorf("namespacedSharedInformerFactory not configured")
}
namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(d.eventHandler())
d.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace)
}
return nil
}
ir := newInformerResources(sharedInformerFactory)
ir.Uses(v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
// Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay
// consistent with the real runs without having to keep the list here in sync.
v1.SchemeGroupVersion.WithResource("namespaces"), // Used by the defaultevictor plugin
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"), // Used by the defaultevictor plugin
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), // Used by the defaultevictor plugin
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"), // Used by the defaultevictor plugin
) // Used by the defaultevictor plugin
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, kubeClientSandbox *kubeClientSandbox) (*descheduler, error) {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// Temporarily register the PVC because it is used by the DefaultEvictor plugin during
// the descheduling cycle, where informer registration is ignored.
_ = sharedInformerFactory.Core().V1().PersistentVolumeClaims().Informer()
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
@@ -174,7 +150,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
podEvictor, err := evictions.NewPodEvictor(
ctx,
rs.Client,
client,
eventRecorder,
podInformer,
rs.DefaultFeatureGates,
@@ -193,44 +169,32 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
}
desch := &descheduler{
rs: rs,
ir: ir,
getPodsAssignedToNode: getPodsAssignedToNode,
sharedInformerFactory: sharedInformerFactory,
deschedulerPolicy: deschedulerPolicy,
eventRecorder: eventRecorder,
podEvictor: podEvictor,
podEvictionReactionFnc: podEvictionReactionFnc,
prometheusClient: rs.PrometheusClient,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}),
metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders),
rs: rs,
client: client,
kubeClientSandbox: kubeClientSandbox,
getPodsAssignedToNode: getPodsAssignedToNode,
sharedInformerFactory: sharedInformerFactory,
deschedulerPolicy: deschedulerPolicy,
eventRecorder: eventRecorder,
podEvictor: podEvictor,
prometheusClient: rs.PrometheusClient,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}),
metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders),
}
nodeSelector, err := nodeSelectorFromPolicy(deschedulerPolicy)
if err != nil {
return nil, err
}
if err := addNodeSelectorIndexer(sharedInformerFactory, nodeSelector); err != nil {
return nil, err
}
if rs.MetricsClient != nil {
nodeSelector := labels.Everything()
if deschedulerPolicy.NodeSelector != nil {
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
if err != nil {
return nil, err
}
nodeSelector = sel
}
desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
}
prometheusProvider := desch.metricsProviders[api.PrometheusMetrics]
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil {
authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference
if authTokenSecret == nil || authTokenSecret.Namespace == "" {
return nil, fmt.Errorf("prometheus metrics source configuration is missing authentication token secret")
}
if namespacedSharedInformerFactory == nil {
return nil, fmt.Errorf("namespacedSharedInformerFactory not configured")
}
namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(desch.eventHandler())
desch.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace)
}
return desch, nil
}
@@ -345,7 +309,7 @@ func (d *descheduler) eventHandler() cache.ResourceEventHandler {
}
}
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
func (d *descheduler) runDeschedulerLoop(ctx context.Context) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "runDeschedulerLoop")
defer span.End()
@@ -354,52 +318,22 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
metrics.LoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
}(time.Now())
// if len is still <= 1 error out
if len(nodes) <= 1 {
klog.InfoS("Skipping descheduling cycle: requires >=2 nodes", "found", len(nodes))
return nil // gracefully skip this cycle instead of aborting
}
var client clientset.Interface
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
// So when evicting pods while running multiple strategies in a row have the cummulative effect
// as is when evicting pods for real.
if d.rs.DryRun {
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// Create a new cache so we start from scratch without any leftovers
fakeClient := fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
fakeClient.PrependReactor("create", "pods", d.podEvictionReactionFnc(fakeClient))
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
err := d.ir.CopyTo(fakeClient, fakeSharedInformerFactory)
if err != nil {
return err
}
// create a new instance of the shared informer factor from the cached client
// register the pod informer, otherwise it will not get running
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
if err != nil {
return fmt.Errorf("build get pods assigned to node function error: %v", err)
}
fakeCtx, cncl := context.WithCancel(context.TODO())
defer cncl()
fakeSharedInformerFactory.Start(fakeCtx.Done())
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
client = fakeClient
d.sharedInformerFactory = fakeSharedInformerFactory
} else {
client = d.rs.Client
}
klog.V(3).Infof("Setting up the pod evictor")
d.podEvictor.SetClient(client)
klog.V(3).Infof("Resetting pod evictor counters")
d.podEvictor.ResetCounters()
d.runProfiles(ctx, client, nodes)
d.runProfiles(ctx)
if d.rs.DryRun {
if d.kubeClientSandbox == nil {
return fmt.Errorf("kubeClientSandbox is nil in DryRun mode")
}
klog.V(3).Infof("Restoring evicted pods from cache")
if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
klog.ErrorS(err, "Failed to restore evicted pods")
return fmt.Errorf("failed to restore evicted pods: %w", err)
}
d.kubeClientSandbox.reset()
}
klog.V(1).InfoS("Number of evictions/requests", "totalEvicted", d.podEvictor.TotalEvicted(), "evictionRequests", d.podEvictor.TotalEvictionRequests())
@@ -409,22 +343,46 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
// runProfiles runs all the deschedule plugins of all profiles and
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
// see https://github.com/kubernetes-sigs/descheduler/issues/979
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node) {
func (d *descheduler) runProfiles(ctx context.Context) {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "runProfiles")
defer span.End()
nodesAsInterface, err := d.sharedInformerFactory.Core().V1().Nodes().Informer().GetIndexer().ByIndex(indexerNodeSelectorGlobal, indexerNodeSelectorGlobal)
if err != nil {
span.AddEvent("Failed to list nodes with global node selector", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
return
}
nodes, err := nodeutil.ReadyNodesFromInterfaces(nodesAsInterface)
if err != nil {
span.AddEvent("Failed to convert node as interfaces into ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
return
}
// if len is still <= 1 error out
if len(nodes) <= 1 {
klog.InfoS("Skipping descheduling cycle: requires >=2 nodes", "found", len(nodes))
return // gracefully skip this cycle instead of aborting
}
var profileRunners []profileRunner
for _, profile := range d.deschedulerPolicy.Profiles {
for idx, profile := range d.deschedulerPolicy.Profiles {
currProfile, err := frameworkprofile.NewProfile(
ctx,
profile,
pluginregistry.PluginRegistry,
frameworkprofile.WithClientSet(client),
frameworkprofile.WithClientSet(d.client),
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(d.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
frameworkprofile.WithMetricsCollector(d.metricsCollector),
frameworkprofile.WithPrometheusClient(d.prometheusClient),
// Generate a unique instance ID using just the index to avoid long IDs
// when profile names are very long
frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
@@ -551,27 +509,6 @@ func validateVersionCompatibility(discovery discovery.DiscoveryInterface, desche
return nil
}
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
return func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "eviction" {
createAct, matched := action.(core.CreateActionImpl)
if !matched {
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
}
eviction, matched := createAct.Object.(*policy.Eviction)
if !matched {
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
}
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
return true, nil, nil
}
// fallback to the default reactor
return false, nil, nil
}
}
type tokenReconciliation int
const (
@@ -587,11 +524,6 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
var nodeSelector string
if deschedulerPolicy.NodeSelector != nil {
nodeSelector = *deschedulerPolicy.NodeSelector
}
var eventClient clientset.Interface
if rs.DryRun {
eventClient = fakeclientset.NewSimpleClientset()
@@ -616,7 +548,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
}
}
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory, namespacedSharedInformerFactory)
// Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, rs.Client, sharedInformerFactory, nil)
if err != nil {
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
@@ -624,17 +557,52 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup Prometheus provider (only for real client case, not for dry run)
if err := setupPrometheusProvider(descheduler, namespacedSharedInformerFactory); err != nil {
span.AddEvent("Failed to setup Prometheus provider", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
// If in dry run mode, replace the descheduler with one using fake client/factory
if rs.DryRun {
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
span.AddEvent("Failed to create kube client sandbox", trace.WithAttributes(attribute.String("err", err.Error())))
return fmt.Errorf("failed to create kube client sandbox: %v", err)
}
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// TODO(ingvagabund): drop the previous queue
// TODO(ingvagabund): stop the previous pod evictor
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
span.AddEvent("Failed to create dry run descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
}
// In dry run mode, start and sync the fake shared informer factory so it can mirror
// events from the real factory. Reliable propagation depends on both factories being
// fully synced (see WaitForCacheSync calls below), not solely on startup order.
if rs.DryRun {
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
}
sharedInformerFactory.Start(ctx.Done())
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.Start(ctx.Done())
}
sharedInformerFactory.WaitForCacheSync(ctx.Done())
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done())
}
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if descheduler.metricsCollector != nil {
go func() {
klog.V(2).Infof("Starting metrics collector")
@@ -666,14 +634,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
defer sSpan.End()
nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.sharedInformerFactory.Core().V1().Nodes().Lister(), nodeSelector)
if err != nil {
sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
cancel()
return
}
err = descheduler.runDeschedulerLoop(sCtx, nodes)
err = descheduler.runDeschedulerLoop(sCtx)
if err != nil {
sSpan.AddEvent("Failed to run descheduler loop", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)

View File

@@ -6,20 +6,27 @@ import (
"fmt"
"math/rand"
"net/http"
"net/url"
"strings"
"testing"
"time"
promapi "github.com/prometheus/client_golang/api"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
apiversion "k8s.io/apimachinery/pkg/version"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/informers"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/featuregate"
"k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
@@ -31,16 +38,32 @@ import (
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/features"
fakeplugin "sigs.k8s.io/descheduler/pkg/framework/fake/plugin"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
"sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/utils"
deschedulerversion "sigs.k8s.io/descheduler/pkg/version"
"sigs.k8s.io/descheduler/test"
)
type mockPrometheusClient struct {
name string
}
func (m *mockPrometheusClient) URL(ep string, args map[string]string) *url.URL {
return nil
}
func (m *mockPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
return nil, nil, nil
}
var _ promapi.Client = &mockPrometheusClient{}
var (
podEvictionError = errors.New("PodEvictionError")
tooManyRequestsError = &apierrors.StatusError{
@@ -177,7 +200,7 @@ func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThreshold
}
}
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, dryRun bool, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
client := fakeclientset.NewSimpleClientset(objects...)
eventClient := fakeclientset.NewSimpleClientset(objects...)
@@ -189,19 +212,73 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate
rs.EventClient = eventClient
rs.DefaultFeatureGates = featureGates
rs.MetricsClient = metricsClient
rs.DryRun = dryRun
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client)
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory, nil)
// Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Unable to create a descheduler instance: %v", err)
t.Fatalf("Unable to create descheduler instance: %v", err)
}
// Setup Prometheus provider (only for real client case, not for dry run)
if err := setupPrometheusProvider(descheduler, nil); err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Failed to setup Prometheus provider: %v", err)
}
// If in dry run mode, replace the descheduler with one using fake client/factory
if dryRun {
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Failed to create kube client sandbox: %v", err)
}
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Unable to create dry run descheduler instance: %v", err)
}
}
// Start the real shared informer factory after creating the descheduler
if dryRun {
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
}
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
if dryRun {
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
for _, obj := range objects {
exists, err := descheduler.kubeClientSandbox.hasRuntimeObjectInIndexer(obj)
if err != nil {
return false, err
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
key := cache.MetaObjectToName(metaObj).String()
if !exists {
klog.Infof("Object %q has not propagated to the indexer", key)
return false, nil
}
klog.Infof("Object %q has propagated to the indexer", key)
}
return true, nil
}); err != nil {
t.Fatalf("nodes did not propagate to the indexer: %v", err)
}
}
return rs, descheduler, client
}
@@ -477,71 +554,92 @@ func taintNodeNoSchedule(node *v1.Node) {
func TestPodEvictorReset(t *testing.T) {
initPluginRegistry()
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
nodes := []*v1.Node{node1, node2}
ownerRef1 := test.GetReplicaSetOwnerRefList()
updatePod := func(pod *v1.Pod) {
pod.Namespace = "dev"
pod.ObjectMeta.OwnerReferences = ownerRef1
tests := []struct {
name string
dryRun bool
cycles []struct {
expectedTotalEvicted uint
expectedRealEvictions int
expectedFakeEvictions int
}
}{
{
name: "real mode",
dryRun: false,
cycles: []struct {
expectedTotalEvicted uint
expectedRealEvictions int
expectedFakeEvictions int
}{
{expectedTotalEvicted: 2, expectedRealEvictions: 2, expectedFakeEvictions: 0},
{expectedTotalEvicted: 2, expectedRealEvictions: 4, expectedFakeEvictions: 0},
},
},
{
name: "dry mode",
dryRun: true,
cycles: []struct {
expectedTotalEvicted uint
expectedRealEvictions int
expectedFakeEvictions int
}{
{expectedTotalEvicted: 2, expectedRealEvictions: 0, expectedFakeEvictions: 2},
{expectedTotalEvicted: 2, expectedRealEvictions: 0, expectedFakeEvictions: 4},
},
},
}
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, updatePod)
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, updatePod)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
ctxCancel, cancel := context.WithCancel(ctx)
rs, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, node1, node2, p1, p2)
defer cancel()
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
var evictedPods []string
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil))
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
ctxCancel, cancel := context.WithCancel(ctx)
_, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, tc.dryRun, node1, node2, p1, p2)
defer cancel()
var fakeEvictedPods []string
descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, nil)
var evictedPods []string
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil))
var fakeEvictedPods []string
for i, cycle := range tc.cycles {
evictedPodNames := runDeschedulerLoopAndGetEvictedPods(ctx, t, descheduler, tc.dryRun)
fakeEvictedPods = append(fakeEvictedPods, evictedPodNames...)
if descheduler.podEvictor.TotalEvicted() != cycle.expectedTotalEvicted || len(evictedPods) != cycle.expectedRealEvictions || len(fakeEvictedPods) != cycle.expectedFakeEvictions {
t.Fatalf("Cycle %d: Expected (%v,%v,%v) pods evicted, got (%v,%v,%v) instead", i+1, cycle.expectedTotalEvicted, cycle.expectedRealEvictions, cycle.expectedFakeEvictions, descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
}
}
})
}
}
// runDeschedulerLoopAndGetEvictedPods runs a descheduling cycle and returns the names of evicted pods.
// This is similar to runDeschedulerLoop but captures evicted pod names before the cache is reset.
func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string {
d.podEvictor.ResetCounters()
d.runProfiles(ctx)
var evictedPodNames []string
if dryRun {
evictedPodsFromCache := d.kubeClientSandbox.evictedPodsCache.list()
for _, pod := range evictedPodsFromCache {
evictedPodNames = append(evictedPodNames, pod.Name)
}
if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("Failed to restore evicted pods: %v", err)
}
d.kubeClientSandbox.reset()
}
// a single pod eviction expected
klog.Infof("2 pod eviction expected per a descheduling cycle, 2 real evictions in total")
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 2 || len(fakeEvictedPods) != 0 {
t.Fatalf("Expected (2,2,0) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
}
// a single pod eviction expected
klog.Infof("2 pod eviction expected per a descheduling cycle, 4 real evictions in total")
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 4 || len(fakeEvictedPods) != 0 {
t.Fatalf("Expected (2,4,0) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
}
// check the fake client syncing and the right pods evicted
klog.Infof("Enabling the dry run mode")
rs.DryRun = true
evictedPods = []string{}
klog.Infof("2 pod eviction expected per a descheduling cycle, 2 fake evictions in total")
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 0 || len(fakeEvictedPods) != 2 {
t.Fatalf("Expected (2,0,2) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
}
klog.Infof("2 pod eviction expected per a descheduling cycle, 4 fake evictions in total")
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 0 || len(fakeEvictedPods) != 4 {
t.Fatalf("Expected (2,0,4) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
}
return evictedPodNames
}
func checkTotals(t *testing.T, ctx context.Context, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) {
@@ -555,7 +653,7 @@ func checkTotals(t *testing.T, ctx context.Context, descheduler *descheduler, to
}
func runDeschedulingCycleAndCheckTotals(t *testing.T, ctx context.Context, nodes []*v1.Node, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) {
err := descheduler.runDeschedulerLoop(ctx, nodes)
err := descheduler.runDeschedulerLoop(ctx)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
@@ -595,14 +693,9 @@ func TestEvictionRequestsCache(t *testing.T) {
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
})
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, node1, node2, p1, p2, p3, p4)
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, false, node1, node2, p1, p2, p3, p4)
defer cancel()
var fakeEvictedPods []string
descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, podEvictionError)
}
var evictedPods []string
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, func(name string) bool { return name == "p1" || name == "p2" }, nil))
@@ -731,20 +824,14 @@ func TestDeschedulingLimits(t *testing.T) {
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
nodes := []*v1.Node{node1, node2}
ctxCancel, cancel := context.WithCancel(ctx)
featureGates := featuregate.NewFeatureGate()
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
})
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, node1, node2)
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, false, node1, node2)
defer cancel()
var fakeEvictedPods []string
descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, podEvictionError)
}
var evictedPods []string
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, func(name string) bool { return name == "p1" || name == "p2" }, nil))
@@ -774,7 +861,7 @@ func TestDeschedulingLimits(t *testing.T) {
time.Sleep(100 * time.Millisecond)
klog.Infof("2 evictions in background expected, 2 normal evictions")
err := descheduler.runDeschedulerLoop(ctx, nodes)
err := descheduler.runDeschedulerLoop(ctx)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
@@ -790,6 +877,217 @@ func TestDeschedulingLimits(t *testing.T) {
}
}
func TestNodeLabelSelectorBasedEviction(t *testing.T) {
initPluginRegistry()
// createNodes creates 4 nodes with different labels and applies a taint to all of them
createNodes := func() (*v1.Node, *v1.Node, *v1.Node, *v1.Node) {
taint := []v1.Taint{
{
Key: "test-taint",
Value: "test-value",
Effect: v1.TaintEffectNoSchedule,
},
}
node1 := test.BuildTestNode("n1", 2000, 3000, 10, func(node *v1.Node) {
node.Labels = map[string]string{
"zone": "us-east-1a",
"node-type": "compute",
"environment": "production",
}
node.Spec.Taints = taint
})
node2 := test.BuildTestNode("n2", 2000, 3000, 10, func(node *v1.Node) {
node.Labels = map[string]string{
"zone": "us-east-1b",
"node-type": "compute",
"environment": "production",
}
node.Spec.Taints = taint
})
node3 := test.BuildTestNode("n3", 2000, 3000, 10, func(node *v1.Node) {
node.Labels = map[string]string{
"zone": "us-west-1a",
"node-type": "storage",
"environment": "staging",
}
node.Spec.Taints = taint
})
node4 := test.BuildTestNode("n4", 2000, 3000, 10, func(node *v1.Node) {
node.Labels = map[string]string{
"zone": "us-west-1b",
"node-type": "storage",
"environment": "staging",
}
node.Spec.Taints = taint
})
return node1, node2, node3, node4
}
tests := []struct {
description string
nodeSelector string
dryRun bool
expectedEvictedFromNodes []string
}{
{
description: "Evict from n1, n2",
nodeSelector: "environment=production",
dryRun: false,
expectedEvictedFromNodes: []string{"n1", "n2"},
},
{
description: "Evict from n1, n2 in dry run mode",
nodeSelector: "environment=production",
dryRun: true,
expectedEvictedFromNodes: []string{"n1", "n2"},
},
{
description: "Evict from n3, n4",
nodeSelector: "environment=staging",
dryRun: false,
expectedEvictedFromNodes: []string{"n3", "n4"},
},
{
description: "Evict from n3, n4 in dry run mode",
nodeSelector: "environment=staging",
dryRun: true,
expectedEvictedFromNodes: []string{"n3", "n4"},
},
{
description: "Evict from n1, n4",
nodeSelector: "zone in (us-east-1a, us-west-1b)",
dryRun: false,
expectedEvictedFromNodes: []string{"n1", "n4"},
},
{
description: "Evict from n1, n4 in dry run mode",
nodeSelector: "zone in (us-east-1a, us-west-1b)",
dryRun: true,
expectedEvictedFromNodes: []string{"n1", "n4"},
},
{
description: "Evict from n2, n3",
nodeSelector: "zone in (us-east-1b, us-west-1a)",
dryRun: false,
expectedEvictedFromNodes: []string{"n2", "n3"},
},
{
description: "Evict from n2, n3 in dry run mode",
nodeSelector: "zone in (us-east-1b, us-west-1a)",
dryRun: true,
expectedEvictedFromNodes: []string{"n2", "n3"},
},
{
description: "Evict from all nodes",
nodeSelector: "",
dryRun: false,
expectedEvictedFromNodes: []string{"n1", "n2", "n3", "n4"},
},
{
description: "Evict from all nodes in dry run mode",
nodeSelector: "",
dryRun: true,
expectedEvictedFromNodes: []string{"n1", "n2", "n3", "n4"},
},
}
for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
ctx := context.Background()
// Create nodes with different labels and taints
node1, node2, node3, node4 := createNodes()
ownerRef := test.GetReplicaSetOwnerRefList()
updatePod := func(pod *v1.Pod) {
pod.ObjectMeta.OwnerReferences = ownerRef
pod.Status.Phase = v1.PodRunning
}
// Create one pod per node
p1 := test.BuildTestPod("p1", 200, 0, node1.Name, updatePod)
p2 := test.BuildTestPod("p2", 200, 0, node2.Name, updatePod)
p3 := test.BuildTestPod("p3", 200, 0, node3.Name, updatePod)
p4 := test.BuildTestPod("p4", 200, 0, node4.Name, updatePod)
objects := []runtime.Object{node1, node2, node3, node4, p1, p2, p3, p4}
// Map pod names to their node names for validation
podToNode := map[string]string{
"p1": "n1",
"p2": "n2",
"p3": "n3",
"p4": "n4",
}
policy := removePodsViolatingNodeTaintsPolicy()
if tc.nodeSelector != "" {
policy.NodeSelector = &tc.nodeSelector
}
ctxCancel, cancel := context.WithCancel(ctx)
_, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, objects...)
defer cancel()
// Verify all pods are created initially
pods, err := client.CoreV1().Pods(p1.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Unable to list pods: %v", err)
}
if len(pods.Items) != 4 {
t.Errorf("Expected 4 pods initially, got %d", len(pods.Items))
}
var evictedPods []string
if !tc.dryRun {
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil))
}
evictedPodNames := runDeschedulerLoopAndGetEvictedPods(ctx, t, deschedulerInstance, tc.dryRun)
if tc.dryRun {
evictedPods = evictedPodNames
}
// Collect which nodes had pods evicted from them
nodesWithEvictedPods := make(map[string]bool)
for _, podName := range evictedPods {
if nodeName, ok := podToNode[podName]; ok {
nodesWithEvictedPods[nodeName] = true
}
}
// Verify the correct number of nodes had pods evicted
if len(nodesWithEvictedPods) != len(tc.expectedEvictedFromNodes) {
t.Fatalf("Expected pods to be evicted from %d nodes, got %d nodes: %v", len(tc.expectedEvictedFromNodes), len(nodesWithEvictedPods), nodesWithEvictedPods)
}
// Verify pods were evicted from the correct nodes
for _, nodeName := range tc.expectedEvictedFromNodes {
if !nodesWithEvictedPods[nodeName] {
t.Fatalf("Expected pod to be evicted from node %s, but it was not", nodeName)
}
}
// Verify no unexpected nodes had pods evicted
for nodeName := range nodesWithEvictedPods {
found := false
for _, expectedNode := range tc.expectedEvictedFromNodes {
if nodeName == expectedNode {
found = true
break
}
}
if !found {
t.Fatalf("Unexpected eviction from node %s", nodeName)
}
}
t.Logf("Successfully evicted pods from nodes: %v", tc.expectedEvictedFromNodes)
})
}
}
func TestLoadAwareDescheduling(t *testing.T) {
initPluginRegistry()
@@ -801,7 +1099,6 @@ func TestLoadAwareDescheduling(t *testing.T) {
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
nodes := []*v1.Node{node1, node2}
p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod)
p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod)
@@ -850,6 +1147,7 @@ func TestLoadAwareDescheduling(t *testing.T) {
initFeatureGates(),
policy,
metricsClientset,
false,
node1, node2, p1, p2, p3, p4, p5)
defer cancel()
@@ -857,7 +1155,7 @@ func TestLoadAwareDescheduling(t *testing.T) {
// after newDescheduler in RunDeschedulerStrategies.
descheduler.metricsCollector.Collect(ctx)
err := descheduler.runDeschedulerLoop(ctx, nodes)
err := descheduler.runDeschedulerLoop(ctx)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
@@ -867,3 +1165,407 @@ func TestLoadAwareDescheduling(t *testing.T) {
}
t.Logf("Total evictions: %v", totalEs)
}
func TestPodEvictionReactionFncErrorHandling(t *testing.T) {
podsGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
testCases := []struct {
name string
setupFnc func(*fakeclientset.Clientset) (name, namespace string)
expectHandled bool
expectError bool
errorContains string
expectedCacheLen int
}{
{
name: "handles pod eviction successfully and adds to cache",
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
pod := test.BuildTestPod("pod1", 100, 0, "node1", test.SetRSOwnerRef)
err := fakeClient.Tracker().Add(pod)
if err != nil {
t.Fatalf("Failed to add pod: %v", err)
}
return pod.Name, pod.Namespace
},
expectHandled: true,
expectError: false,
expectedCacheLen: 1,
},
{
name: "returns false and error when delete fails allowing other reactors to handle",
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
pod := test.BuildTestPod("pod1", 100, 0, "node1", test.SetRSOwnerRef)
if err := fakeClient.Tracker().Add(pod); err != nil {
t.Fatalf("Failed to add pod: %v", err)
}
if err := fakeClient.Tracker().Delete(podsGVR, pod.Namespace, pod.Name); err != nil {
t.Fatalf("Failed to pre-delete pod: %v", err)
}
return pod.Name, pod.Namespace
},
expectHandled: false,
expectError: true,
errorContains: "unable to delete pod",
expectedCacheLen: 0,
},
{
name: "returns error when pod doesn't exist in tracker from the start",
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
// Don't add the pod to the tracker at all
return "nonexistent-pod", "default"
},
expectHandled: false,
expectError: true,
errorContains: "unable to delete pod",
expectedCacheLen: 0,
},
{
name: "returns error when object is not a pod",
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
configMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-config",
Namespace: "default",
},
}
if err := fakeClient.Tracker().Create(podsGVR, configMap, "default"); err != nil {
t.Fatalf("Failed to add ConfigMap to pods resource: %v", err)
}
return configMap.Name, configMap.Namespace
},
expectHandled: false,
expectError: true,
errorContains: "unable to convert object to *v1.Pod",
expectedCacheLen: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fakeClient := fakeclientset.NewSimpleClientset()
cache := newEvictedPodsCache()
name, namespace := tc.setupFnc(fakeClient)
reactionFnc := podEvictionReactionFnc(fakeClient, cache)
handled, _, err := reactionFnc(core.NewCreateSubresourceAction(
podsGVR,
name,
"eviction",
namespace,
&policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
},
))
if handled != tc.expectHandled {
t.Errorf("Expected handled=%v, got %v", tc.expectHandled, handled)
}
if tc.expectError {
if err == nil {
t.Fatal("Expected error, got nil")
}
if !strings.Contains(err.Error(), tc.errorContains) {
t.Errorf("Expected error message to contain '%s', got: %v", tc.errorContains, err)
}
} else {
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
}
if len(cache.list()) != tc.expectedCacheLen {
t.Errorf("Expected %d pods in cache, got %d", tc.expectedCacheLen, len(cache.list()))
}
})
}
}
// verifyPodIdentityFields checks if name, namespace, and UID match expected values
func verifyPodIdentityFields(t *testing.T, name, namespace, uid, expectedName, expectedNamespace, expectedUID, context string) {
t.Helper()
if name != expectedName {
t.Fatalf("Expected pod name %s%s, got %s", expectedName, context, name)
}
if namespace != expectedNamespace {
t.Fatalf("Expected pod namespace %s%s, got %s", expectedNamespace, context, namespace)
}
if uid != expectedUID {
t.Fatalf("Expected pod UID %s%s, got %s", expectedUID, context, uid)
}
}
// verifyPodIdentity checks if a pod has the expected name, namespace, and UID
func verifyPodIdentity(t *testing.T, pod *v1.Pod, expectedName, expectedNamespace string, expectedUID types.UID) {
t.Helper()
verifyPodIdentityFields(t, pod.Name, pod.Namespace, string(pod.UID), expectedName, expectedNamespace, string(expectedUID), "")
}
func TestEvictedPodRestorationInDryRun(t *testing.T) {
// Initialize klog flags
// klog.InitFlags(nil)
// Set verbosity level (higher number = more verbose)
// 0 = errors only, 1-4 = info, 5-9 = debug, 10+ = trace
// flag.Set("v", "4")
initPluginRegistry()
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()
// Create descheduler with DryRun mode
client := fakeclientset.NewSimpleClientset(node1, node2, p1)
eventClient := fakeclientset.NewSimpleClientset(node1, node2, p1)
rs, err := options.NewDeschedulerServer()
if err != nil {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.EventClient = eventClient
rs.DefaultFeatureGates = initFeatureGates()
rs.DryRun = true // Set DryRun before creating descheduler
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctxCancel, client)
defer eventBroadcaster.Shutdown()
// Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil)
if err != nil {
t.Fatalf("Unable to create descheduler instance: %v", err)
}
sharedInformerFactory.Start(ctxCancel.Done())
sharedInformerFactory.WaitForCacheSync(ctxCancel.Done())
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
t.Fatalf("Failed to create kube client sandbox: %v", err)
}
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
t.Fatalf("Unable to create dry run descheduler instance: %v", err)
}
// Start and sync the fake factory after creating the descheduler
kubeClientSandbox.fakeSharedInformerFactory().Start(ctxCancel.Done())
kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctxCancel.Done())
// Verify the pod exists in the fake client after initialization
pod, err := kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected pod %s to exist in fake client after initialization, but got error: %v", p1.Name, err)
}
verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID)
klog.Infof("Pod %s exists in fake client after initialization", p1.Name)
// Run two descheduling cycles to verify pod eviction and restoration works repeatedly
for i := 1; i <= 2; i++ {
// Run descheduling cycle
klog.Infof("Running descheduling cycle %d", i)
descheduler.podEvictor.ResetCounters()
descheduler.runProfiles(ctx)
// Verify the pod was evicted (should not exist in fake client anymore)
_, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
if err == nil {
t.Fatalf("Expected pod %s to be evicted from fake client in cycle %d, but it still exists", p1.Name, i)
}
if !apierrors.IsNotFound(err) {
t.Fatalf("Expected NotFound error for pod %s in cycle %d, got: %v", p1.Name, i, err)
}
klog.Infof("Pod %s was successfully evicted from fake client in cycle %d", p1.Name, i)
// Verify the pod was added to the evicted pods cache
evictedPods := descheduler.kubeClientSandbox.evictedPodsCache.list()
if len(evictedPods) != 1 {
t.Fatalf("Expected 1 pod in evicted cache in cycle %d, got %d", i, len(evictedPods))
}
verifyPodIdentityFields(t, evictedPods[0].Name, evictedPods[0].Namespace, evictedPods[0].UID, p1.Name, p1.Namespace, string(p1.UID), fmt.Sprintf(" in cycle %d", i))
klog.Infof("Pod %s was successfully added to evicted pods cache in cycle %d (UID: %s)", p1.Name, i, p1.UID)
// Restore evicted pods
klog.Infof("Restoring evicted pods from cache in cycle %d", i)
if err := descheduler.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("Failed to restore evicted pods in cycle %d: %v", i, err)
}
descheduler.kubeClientSandbox.evictedPodsCache.clear()
// Verify the pod was restored back to the fake client
pod, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected pod %s to be restored to fake client in cycle %d, but got error: %v", p1.Name, i, err)
}
verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID)
klog.Infof("Pod %s was successfully restored to fake client in cycle %d (UID: %s)", p1.Name, i, pod.UID)
// Verify cache was cleared after restoration
evictedPods = descheduler.kubeClientSandbox.evictedPodsCache.list()
if len(evictedPods) != 0 {
t.Fatalf("Expected evicted cache to be empty after restoration in cycle %d, got %d pods", i, len(evictedPods))
}
klog.Infof("Evicted pods cache was cleared after restoration in cycle %d", i)
}
}
// verifyAllPrometheusClientsEqual checks that all Prometheus client variables are equal to the expected value
func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPluginHandle, fromDescheduler promapi.Client) {
t.Helper()
if fromReactor != expected {
t.Fatalf("Prometheus client from reactor: expected %v, got %v", expected, fromReactor)
}
if fromPluginHandle != expected {
t.Fatalf("Prometheus client from plugin handle: expected %v, got %v", expected, fromPluginHandle)
}
if fromDescheduler != expected {
t.Fatalf("Prometheus client from descheduler: expected %v, got %v", expected, fromDescheduler)
}
t.Logf("All Prometheus clients variables correctly set to: %v", expected)
}
// TestPluginPrometheusClientAccess tests that the Prometheus client is accessible through the plugin handle
func TestPluginPrometheusClientAccess(t *testing.T) {
testCases := []struct {
name string
dryRun bool
}{
{
name: "dry run disabled",
dryRun: false,
},
{
name: "dry run enabled",
dryRun: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
initPluginRegistry()
newInvoked := false
reactorInvoked := false
var prometheusClientFromPluginNewHandle promapi.Client
var prometheusClientFromReactor promapi.Client
fakePlugin := &fakeplugin.FakePlugin{
PluginName: "TestPluginWithPrometheusClient",
}
fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) {
if dAction, ok := action.(fakeplugin.DescheduleAction); ok {
reactorInvoked = true
prometheusClientFromReactor = dAction.Handle().PrometheusClient()
return true, false, nil
}
return false, false, nil
})
pluginregistry.Register(
fakePlugin.PluginName,
fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) {
newInvoked = true
prometheusClientFromPluginNewHandle = action.Handle().PrometheusClient()
}),
&fakeplugin.FakePlugin{},
&fakeplugin.FakePluginArgs{},
fakeplugin.ValidateFakePluginArgs,
fakeplugin.SetDefaults_FakePluginArgs,
pluginregistry.PluginRegistry,
)
deschedulerPolicy := &api.DeschedulerPolicy{
Profiles: []api.DeschedulerProfile{
{
Name: "test-profile",
PluginConfigs: []api.PluginConfig{
{
Name: fakePlugin.PluginName,
Args: &fakeplugin.FakePluginArgs{},
},
},
Plugins: api.Plugins{
Deschedule: api.PluginSet{
Enabled: []string{fakePlugin.PluginName},
},
},
},
},
}
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
_, descheduler, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
// Test cycles with different Prometheus client values
cycles := []struct {
name string
client promapi.Client
}{
{
name: "initial client",
client: &mockPrometheusClient{name: "new-init-client"},
},
{
name: "nil client",
client: nil,
},
{
name: "new client",
client: &mockPrometheusClient{name: "new-client"},
},
{
name: "another client",
client: &mockPrometheusClient{name: "another-client"},
},
}
for i, cycle := range cycles {
t.Logf("Cycle %d: %s", i+1, cycle.name)
// Set the descheduler's Prometheus client
t.Logf("Setting descheduler.prometheusClient from %v to %v", descheduler.prometheusClient, cycle.client)
descheduler.prometheusClient = cycle.client
newInvoked = false
reactorInvoked = false
prometheusClientFromPluginNewHandle = nil
prometheusClientFromReactor = nil
descheduler.runProfiles(ctx)
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.prometheusClient=%v", i+1, prometheusClientFromReactor, descheduler.prometheusClient)
if !newInvoked {
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
}
if !reactorInvoked {
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
}
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.prometheusClient)
}
})
}
}

View File

@@ -403,12 +403,6 @@ func (pe *PodEvictor) ResetCounters() {
pe.totalPodCount = 0
}
func (pe *PodEvictor) SetClient(client clientset.Interface) {
pe.mu.Lock()
defer pe.mu.Unlock()
pe.client = client
}
func (pe *PodEvictor) evictionRequestsTotal() uint {
if pe.featureGates.Enabled(features.EvictionsInBackground) {
return pe.erCache.evictionRequestsTotal()

View File

@@ -0,0 +1,446 @@
/*
Copyright 2026 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package descheduler
import (
"context"
"fmt"
"sync"
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode
type evictedPodInfo struct {
Namespace string
Name string
UID string
}
// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode
type evictedPodsCache struct {
sync.RWMutex
pods map[string]*evictedPodInfo
}
func newEvictedPodsCache() *evictedPodsCache {
return &evictedPodsCache{
pods: make(map[string]*evictedPodInfo),
}
}
func (c *evictedPodsCache) add(pod *v1.Pod) {
c.Lock()
defer c.Unlock()
c.pods[string(pod.UID)] = &evictedPodInfo{
Namespace: pod.Namespace,
Name: pod.Name,
UID: string(pod.UID),
}
}
func (c *evictedPodsCache) list() []*evictedPodInfo {
c.RLock()
defer c.RUnlock()
pods := make([]*evictedPodInfo, 0, len(c.pods))
for _, pod := range c.pods {
podCopy := *pod
pods = append(pods, &podCopy)
}
return pods
}
func (c *evictedPodsCache) clear() {
c.Lock()
defer c.Unlock()
c.pods = make(map[string]*evictedPodInfo)
}
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory
// that mirrors resources from a real client, useful for dry-run testing scenarios
type kubeClientSandbox struct {
fakeKubeClient *fakeclientset.Clientset
fakeFactory informers.SharedInformerFactory
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
evictedPodsCache *evictedPodsCache
podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error)
}
func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) {
return newKubeClientSandbox(client, sharedInformerFactory,
v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
v1.SchemeGroupVersion.WithResource("namespaces"),
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"),
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"),
)
}
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
sandbox := &kubeClientSandbox{
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
evictedPodsCache: newEvictedPodsCache(),
podEvictionReactionFnc: podEvictionReactionFnc,
}
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
for _, resource := range resources {
informer, err := sharedInformerFactory.ForResource(resource)
if err != nil {
return nil, err
}
sandbox.resourceToInformer[resource] = informer
}
// Register event handlers to sync changes from real client to fake client.
// These handlers will keep the fake client in sync with ongoing changes.
if err := sandbox.registerEventHandlers(); err != nil {
return nil, fmt.Errorf("error registering event handlers: %w", err)
}
return sandbox, nil
}
func (sandbox *kubeClientSandbox) registerEventHandlers() error {
for resource, informer := range sandbox.resourceToInformer {
// Create a local copy to avoid closure capture issue
resource := resource
_, err := sandbox.fakeFactory.ForResource(resource)
if err != nil {
return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err)
}
_, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.ErrorS(err, "failed to add object to fake client", "resource", resource)
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
runtimeObj, ok := newObj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil {
klog.ErrorS(err, "failed to update object in fake client", "resource", resource)
}
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone case where the object might be wrapped
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
runtimeObj, ok := obj.(runtime.Object)
if !ok {
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
return
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
return
}
if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil {
klog.ErrorS(err, "failed to delete object from fake client", "resource", resource)
}
},
})
if err != nil {
return fmt.Errorf("error adding event handler for resource %s: %w", resource, err)
}
}
return nil
}
func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset {
return sandbox.fakeKubeClient
}
func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory {
return sandbox.fakeFactory
}
func (sandbox *kubeClientSandbox) reset() {
sandbox.evictedPodsCache.clear()
}
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name, uid string) (bool, error) {
informer, err := sandbox.fakeFactory.ForResource(resource)
if err != nil {
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
}
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
obj, exists, err := informer.Informer().GetIndexer().GetByKey(key)
if err != nil {
return false, err
}
if !exists {
return false, nil
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
return string(metaObj.GetUID()) == uid, nil
}
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) {
// Get metadata accessor to extract namespace and name
metaObj, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("failed to get object metadata: %w", err)
}
// Get the GVK from the object using TypeMeta
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.Empty() {
return false, fmt.Errorf("no GroupVersionKind found for object")
}
// Use the GVK to construct the GVR by pluralizing the kind
plural, _ := meta.UnsafeGuessKindToResource(gvk)
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: plural.Resource,
}
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName(), string(metaObj.GetUID()))
}
// shouldSkipPodWait checks if a pod still exists in the fake client with the expected UID.
// Returns (shouldSkip, error). shouldSkip is true if we should skip waiting for this pod.
// Returns an error for unexpected failures (non-NotFound errors).
func (sandbox *kubeClientSandbox) shouldSkipPodWait(ctx context.Context, pod *evictedPodInfo) (bool, error) {
// Check if the pod still exists in the fake client (it could have been deleted from the real client
// and the deletion propagated via event handlers in the meantime)
fakePod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(3).InfoS("Pod no longer exists in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}
return false, fmt.Errorf("error getting pod %s/%s from fake client: %w", pod.Namespace, pod.Name, err)
}
if string(fakePod.UID) != pod.UID {
klog.V(3).InfoS("Pod UID mismatch in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name, "expectedUID", pod.UID, "actualUID", string(fakePod.UID))
return true, nil
}
return false, nil
}
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
if len(pods) == 0 {
return nil
}
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
for _, pod := range pods {
satisfied, err := checkFn(pod)
if err != nil {
return false, err
}
if !satisfied {
return false, nil
}
}
return true, nil
})
if err != nil {
return err
}
klog.V(4).InfoS(successMsg)
return nil
}
// restoreEvictedPods restores pods from the evicted pods cache back to the fake client
func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error {
podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")]
if !ok {
return fmt.Errorf("pod informer not found in resourceToInformer")
}
evictedPods := sandbox.evictedPodsCache.list()
// First wait loop: Check that all evicted pods are cleared from the indexers.
// This ensures the eviction has fully propagated through the fake informer's indexer.
// We check both existence and UID match to handle cases where a pod was deleted and
// recreated with the same name but different UID.
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
if err != nil {
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if exists {
klog.V(4).InfoS("Pod with matching UID still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
return false, nil
}
klog.V(4).InfoS("Pod with matching UID no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
return true, nil
}, "All evicted pods removed from fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
}
var restoredPods []*evictedPodInfo
for _, podInfo := range sandbox.evictedPodsCache.list() {
obj, err := podInformer.Lister().ByNamespace(podInfo.Namespace).Get(podInfo.Name)
if err != nil {
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "error", err)
continue
}
pod, ok := obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Object is not a pod", "namespace", podInfo.Namespace, "name", podInfo.Name)
continue
}
if string(pod.UID) != podInfo.UID {
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "expectedUID", podInfo.UID, "actualUID", string(pod.UID))
continue
}
if err = sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", podInfo.Namespace, podInfo.Name, err)
}
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", podInfo.Namespace, "name", podInfo.Name, "uid", podInfo.UID)
restoredPods = append(restoredPods, podInfo)
}
// Second wait loop: Make sure the evicted pods are added back to the fake client.
// This ensures the restored pods are accessible through the fake informer's lister.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
if checkErr != nil {
return false, checkErr
}
if shouldSkip {
return true, nil
}
return false, nil
}
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
return true, nil
}, "All restored pods are accessible in fake informer"); err != nil {
return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err)
}
// Third wait loop: Make sure the indexers see the added pods.
// This is important to ensure each descheduling cycle can see all the restored pods.
// Without this wait, the next cycle might not see the restored pods in the indexer yet.
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
if err != nil {
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
return false, nil
}
if !exists {
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
if checkErr != nil {
return false, checkErr
}
if shouldSkip {
return true, nil
}
return false, nil
}
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
return true, nil
}, "All restored pods are now in fake indexer"); err != nil {
return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err)
}
return nil
}
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) {
return func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "eviction" {
createAct, matched := action.(core.CreateActionImpl)
if !matched {
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
}
eviction, matched := createAct.Object.(*policy.Eviction)
if !matched {
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
}
podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName())
if err == nil {
if pod, ok := podObj.(*v1.Pod); ok {
evictedCache.add(pod)
} else {
return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName())
}
} else if !apierrors.IsNotFound(err) {
return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
return true, nil, nil
}
// fallback to the default reactor
return false, nil, nil
}
}

View File

@@ -0,0 +1,582 @@
/*
Copyright 2026 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package descheduler
import (
"context"
"testing"
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/descheduler/test"
)
// setupTestSandbox creates and initializes a kubeClientSandbox for testing.
// It creates a fake client, informer factory, registers pod and node informers,
// creates the sandbox, and starts both factories.
func setupTestSandbox(ctx context.Context, t *testing.T, initialObjects ...runtime.Object) (*kubeClientSandbox, informers.SharedInformerFactory, clientset.Interface) {
// Create a "real" fake client to act as the source of truth
realClient := fakeclientset.NewSimpleClientset(initialObjects...)
realFactory := informers.NewSharedInformerFactoryWithOptions(realClient, 0)
// Register pods and nodes informers BEFORE creating the sandbox
_ = realFactory.Core().V1().Pods().Informer()
_ = realFactory.Core().V1().Nodes().Informer()
// Create the sandbox with only pods and nodes resources
sandbox, err := newKubeClientSandbox(realClient, realFactory,
v1.SchemeGroupVersion.WithResource("pods"),
v1.SchemeGroupVersion.WithResource("nodes"),
)
if err != nil {
t.Fatalf("Failed to create kubeClientSandbox: %v", err)
}
// fake factory created by newKubeClientSandbox needs to be started before
// the "real" fake client factory to have all handlers registered
// to get complete propagation
sandbox.fakeSharedInformerFactory().Start(ctx.Done())
sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
realFactory.Start(ctx.Done())
realFactory.WaitForCacheSync(ctx.Done())
return sandbox, realFactory, realClient
}
func TestKubeClientSandboxEventHandlers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sandbox, realFactory, realClient := setupTestSandbox(ctx, t)
// Register a third resource (secrets) in the real factory AFTER sandbox creation
// This should NOT be synced to the fake client
_ = realFactory.Core().V1().Secrets().Informer()
// Create test objects
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
UID: "pod-uid-12345",
},
Spec: v1.PodSpec{
NodeName: "test-node",
},
}
testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
UID: "node-uid-67890",
},
}
testSecret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test-secret",
Namespace: "default",
UID: "secret-uid-abcde",
},
Data: map[string][]byte{
"key": []byte("value"),
},
}
// Add objects to the real client
var err error
_, err = realClient.CoreV1().Pods(testPod.Namespace).Create(ctx, testPod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod in real client: %v", err)
}
_, err = realClient.CoreV1().Nodes().Create(ctx, testNode, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create node in real client: %v", err)
}
_, err = realClient.CoreV1().Secrets(testSecret.Namespace).Create(ctx, testSecret, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create secret in real client: %v", err)
}
// Helper function to wait for a resource to appear in the sandbox's fake client indexer
waitForResourceInIndexer := func(resourceType, namespace, name, uid, description string) error {
t.Logf("Waiting for %s to appear in fake client indexer...", description)
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
exists, err := sandbox.hasObjectInIndexer(
v1.SchemeGroupVersion.WithResource(resourceType),
namespace,
name,
uid,
)
if err != nil {
t.Logf("Error checking %s in indexer: %v", description, err)
return false, nil
}
if exists {
t.Logf("%s appeared in fake client indexer", description)
return true, nil
}
return false, nil
})
}
// Wait for the pod to appear in the sandbox's fake client indexer
if err := waitForResourceInIndexer("pods", testPod.Namespace, testPod.Name, string(testPod.UID), "pod"); err != nil {
t.Fatalf("Pod did not appear in fake client indexer within timeout: %v", err)
}
// Wait for the node to appear in the sandbox's fake client indexer
if err := waitForResourceInIndexer("nodes", "", testNode.Name, string(testNode.UID), "node"); err != nil {
t.Fatalf("Node did not appear in fake client indexer within timeout: %v", err)
}
// Verify the pod can be retrieved from the fake client
retrievedPod, err := sandbox.fakeClient().CoreV1().Pods(testPod.Namespace).Get(ctx, testPod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve pod from fake client: %v", err)
}
if retrievedPod.Namespace != testPod.Namespace || retrievedPod.Name != testPod.Name || retrievedPod.UID != testPod.UID {
t.Errorf("Retrieved pod mismatch: got namespace=%s name=%s uid=%s, want namespace=%s name=%s uid=%s",
retrievedPod.Namespace, retrievedPod.Name, retrievedPod.UID, testPod.Namespace, testPod.Name, testPod.UID)
}
t.Logf("Successfully retrieved pod from fake client: %s/%s", retrievedPod.Namespace, retrievedPod.Name)
// Verify the node can be retrieved from the fake client
retrievedNode, err := sandbox.fakeClient().CoreV1().Nodes().Get(ctx, testNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve node from fake client: %v", err)
}
if retrievedNode.Name != testNode.Name || retrievedNode.UID != testNode.UID {
t.Errorf("Retrieved node mismatch: got name=%s uid=%s, want name=%s uid=%s",
retrievedNode.Name, retrievedNode.UID, testNode.Name, testNode.UID)
}
t.Logf("Successfully retrieved node from fake client: %s", retrievedNode.Name)
// Wait a bit longer and verify the secret does NOT appear in the fake client indexer
// because secrets were registered AFTER the sandbox was created
t.Log("Verifying secret does NOT appear in fake client indexer...")
time.Sleep(500 * time.Millisecond) // Give extra time to ensure it's not just a timing issue
// First, verify we can get the informer for secrets in the fake factory
secretInformer, err := sandbox.fakeSharedInformerFactory().ForResource(v1.SchemeGroupVersion.WithResource("secrets"))
if err != nil {
t.Logf("Expected: Cannot get secret informer from fake factory: %v", err)
} else {
// If we can get the informer, check if the secret exists in it
key := "default/test-secret"
_, exists, err := secretInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
t.Logf("Error checking secret in fake indexer: %v", err)
}
if exists {
t.Error("Secret should NOT exist in fake client indexer (it was registered after sandbox creation)")
} else {
t.Log("Correctly verified: Secret does not exist in fake client indexer")
}
}
// Also verify that attempting to get the secret directly from fake client should fail
_, err = sandbox.fakeClient().CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{})
if err == nil {
t.Error("Secret should NOT be retrievable from fake client (it was not synced)")
} else {
t.Logf("Correctly verified: Secret not retrievable from fake client: %v", err)
}
// Verify the secret IS in the real client (sanity check)
_, err = realClient.CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Secret should exist in real client but got error: %v", err)
}
t.Log("Sanity check passed: Secret exists in real client")
}
func TestKubeClientSandboxReset(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
sandbox, _, _ := setupTestSandbox(ctx, t, node1, p1, p2)
eviction1 := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p1.Name,
Namespace: p1.Namespace,
},
}
eviction2 := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p2.Name,
Namespace: p2.Namespace,
},
}
if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil {
t.Fatalf("Error evicting p1: %v", err)
}
if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil {
t.Fatalf("Error evicting p2: %v", err)
}
evictedPods := sandbox.evictedPodsCache.list()
if len(evictedPods) != 2 {
t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods))
}
t.Logf("Evicted pods in cache before reset: %d", len(evictedPods))
for _, evictedPod := range evictedPods {
if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" {
t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
}
t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
}
sandbox.reset()
evictedPodsAfterReset := sandbox.evictedPodsCache.list()
if len(evictedPodsAfterReset) != 0 {
t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset))
}
t.Logf("Successfully verified cache is empty after reset")
}
func TestKubeClientSandboxRestoreEvictedPods(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
// Create test pods
pod1 := test.BuildTestPod("pod1", 100, 0, node1.Name, test.SetRSOwnerRef)
pod2 := test.BuildTestPod("pod2", 100, 0, node1.Name, test.SetRSOwnerRef)
pod3 := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
pod4 := test.BuildTestPod("pod4", 100, 0, node1.Name, test.SetRSOwnerRef)
sandbox, _, realClient := setupTestSandbox(ctx, t, node1, pod1, pod2, pod3, pod4)
// Evict all pods
for _, pod := range []*v1.Pod{pod1, pod2, pod3, pod4} {
eviction := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
if err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).EvictV1(ctx, eviction); err != nil {
t.Fatalf("Error evicting %s: %v", pod.Name, err)
}
}
// Delete pod2 from real client to simulate it being deleted (should skip restoration)
if err := realClient.CoreV1().Pods(pod2.Namespace).Delete(ctx, pod2.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod2 from real client: %v", err)
}
// Delete and recreate pod3 with different UID in real client (should skip restoration)
if err := realClient.CoreV1().Pods(pod3.Namespace).Delete(ctx, pod3.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod3 from real client: %v", err)
}
pod3New := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
// Ensure pod3New has a different UID from pod3
if pod3New.UID == pod3.UID {
pod3New.UID = "new-uid-for-pod3"
}
if _, err := realClient.CoreV1().Pods(pod3New.Namespace).Create(ctx, pod3New, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error recreating pod3 in real client: %v", err)
}
// Verify evicted pods are in cache
evictedPods := sandbox.evictedPodsCache.list()
if len(evictedPods) != 4 {
t.Fatalf("Expected 4 evicted pods in cache, got %d", len(evictedPods))
}
t.Logf("Evicted pods in cache: %d", len(evictedPods))
// Call restoreEvictedPods
t.Log("Calling restoreEvictedPods...")
if err := sandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("restoreEvictedPods failed: %v", err)
}
// Verify pod1 and pod4 were restored (exists in fake client with matching UID and accessible via indexer)
for _, pod := range []*v1.Pod{pod1, pod4} {
// Check restoration via fake client
restoredPod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("%s should have been restored to fake client: %v", pod.Name, err)
} else {
if restoredPod.UID != pod.UID {
t.Errorf("Restored %s UID mismatch: got %s, want %s", pod.Name, restoredPod.UID, pod.UID)
}
t.Logf("Successfully verified %s restoration: %s/%s (UID: %s)", pod.Name, restoredPod.Namespace, restoredPod.Name, restoredPod.UID)
}
// Check accessibility via indexer
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, string(pod.UID))
if err != nil {
t.Errorf("Error checking %s in indexer: %v", pod.Name, err)
}
if !exists {
t.Errorf("%s should exist in fake indexer after restoration", pod.Name)
} else {
t.Logf("Successfully verified %s exists in fake indexer", pod.Name)
}
}
// Verify pod2 was NOT restored (deleted from real client)
_, err := sandbox.fakeClient().CoreV1().Pods(pod2.Namespace).Get(ctx, pod2.Name, metav1.GetOptions{})
if err == nil {
t.Error("pod2 should NOT have been restored (was deleted from real client)")
} else {
t.Logf("Correctly verified pod2 was not restored: %v", err)
}
// Verify pod3 was NOT restored with old UID (UID mismatch case)
// Note: pod3 may exist in fake client with NEW UID due to event handler syncing,
// but it should NOT have been restored with the OLD UID from evicted cache
pod3InFake, err := sandbox.fakeClient().CoreV1().Pods(pod3.Namespace).Get(ctx, pod3.Name, metav1.GetOptions{})
if err == nil {
// Pod3 exists, but it should have the NEW UID, not the old one
if pod3InFake.UID == pod3.UID {
t.Error("pod3 should NOT have been restored with old UID (UID mismatch should prevent restoration)")
} else {
t.Logf("Correctly verified pod3 has new UID (%s), not old UID (%s) - restoration was skipped", pod3InFake.UID, pod3.UID)
}
} else {
// Pod3 doesn't exist - this is also acceptable (event handlers haven't synced it yet)
t.Logf("pod3 not found in fake client: %v", err)
}
// Verify evicted pods cache is still intact (restoreEvictedPods doesn't clear it)
evictedPodsAfter := sandbox.evictedPodsCache.list()
if len(evictedPodsAfter) != 4 {
t.Errorf("Expected evicted pods cache to still have 4 entries, got %d", len(evictedPodsAfter))
}
}
func TestKubeClientSandboxRestoreEvictedPodsEmptyCache(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
sandbox, _, _ := setupTestSandbox(ctx, t, node1)
// Call restoreEvictedPods with empty cache - should be a no-op
t.Log("Calling restoreEvictedPods with empty cache...")
if err := sandbox.restoreEvictedPods(ctx); err != nil {
t.Fatalf("restoreEvictedPods should succeed with empty cache: %v", err)
}
t.Log("Successfully verified restoreEvictedPods handles empty cache")
}
func TestEvictedPodsCache(t *testing.T) {
t.Run("add single pod", func(t *testing.T) {
const (
podName = "pod1"
podNamespace = "default"
podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
)
cache := newEvictedPodsCache()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
}
cache.add(pod)
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod in cache, got %d", len(pods))
}
if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID {
t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID)
}
})
t.Run("add multiple pods", func(t *testing.T) {
cache := newEvictedPodsCache()
pods := []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}},
}
for _, pod := range pods {
cache.add(pod)
}
cachedPods := cache.list()
if len(cachedPods) != 3 {
t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods))
}
podMap := make(map[string]*evictedPodInfo)
for _, cachedPod := range cachedPods {
podMap[cachedPod.UID] = cachedPod
}
for _, pod := range pods {
cached, ok := podMap[string(pod.UID)]
if !ok {
t.Errorf("Pod with UID %s not found in cache", pod.UID)
continue
}
if cached.Name != pod.Name || cached.Namespace != pod.Namespace {
t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace)
}
}
})
t.Run("add duplicate pod updates entry", func(t *testing.T) {
const (
duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
updatedPodName = "pod1-new"
updatedPodNS = "kube-system"
)
cache := newEvictedPodsCache()
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: duplicateUID,
},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: updatedPodName,
Namespace: updatedPodNS,
UID: duplicateUID,
},
}
cache.add(pod1)
cache.add(pod2)
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods))
}
if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS {
t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace)
}
})
t.Run("list returns empty array for empty cache", func(t *testing.T) {
cache := newEvictedPodsCache()
pods := cache.list()
if pods == nil {
t.Fatal("Expected non-nil slice from list()")
}
if len(pods) != 0 {
t.Fatalf("Expected empty list, got %d pods", len(pods))
}
})
t.Run("list returns copies not references", func(t *testing.T) {
const originalPodName = "pod1"
cache := newEvictedPodsCache()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: originalPodName,
Namespace: "default",
UID: "12345678-1234-1234-1234-123456789abc",
},
}
cache.add(pod)
pods1 := cache.list()
pods2 := cache.list()
if len(pods1) != 1 || len(pods2) != 1 {
t.Fatalf("Expected 1 pod in both lists")
}
pods1[0].Name = "modified"
if pods2[0].Name == "modified" {
t.Error("Modifying list result should not affect other list results (should be copies)")
}
pods3 := cache.list()
if pods3[0].Name != originalPodName {
t.Error("Cache data was modified, list() should return copies")
}
})
t.Run("clear empties the cache", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}})
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}})
if len(cache.list()) != 2 {
t.Fatal("Expected 2 pods before clear")
}
cache.clear()
pods := cache.list()
if len(pods) != 0 {
t.Fatalf("Expected empty cache after clear, got %d pods", len(pods))
}
})
t.Run("clear on empty cache is safe", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.clear()
pods := cache.list()
if len(pods) != 0 {
t.Fatalf("Expected empty cache, got %d pods", len(pods))
}
})
t.Run("add after clear works correctly", func(t *testing.T) {
cache := newEvictedPodsCache()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}})
cache.clear()
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}})
pods := cache.list()
if len(pods) != 1 {
t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods))
}
if pods[0].Name != "pod2" {
t.Errorf("Expected pod2, got %s", pods[0].Name)
}
})
}

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/descheduler/pkg/api"
@@ -78,6 +79,22 @@ func ReadyNodes(ctx context.Context, client clientset.Interface, nodeLister list
return readyNodes, nil
}
// ReadyNodesFromInterfaces converts a list of interface{} items to ready nodes.
// Each interface{} item is expected to be a *v1.Node. Only ready nodes are returned.
func ReadyNodesFromInterfaces(nodeInterfaces []interface{}) ([]*v1.Node, error) {
readyNodes := make([]*v1.Node, 0, len(nodeInterfaces))
for i, nodeInterface := range nodeInterfaces {
node, ok := nodeInterface.(*v1.Node)
if !ok {
return nil, fmt.Errorf("item at index %d is not a *v1.Node", i)
}
if IsReady(node) {
readyNodes = append(readyNodes, node)
}
}
return readyNodes, nil
}
// IsReady checks if the descheduler could run against given node.
func IsReady(node *v1.Node) bool {
for i := range node.Status.Conditions {
@@ -400,3 +417,22 @@ func podMatchesInterPodAntiAffinity(nodeIndexer podutil.GetPodsAssignedToNodeFun
return false, nil
}
// BuildGetPodsAssignedToNodeFunc establishes an indexer to map the pods and their assigned nodes.
// It returns a function to help us get all the pods that assigned to a node based on the indexer.
func AddNodeSelectorIndexer(nodeInformer cache.SharedIndexInformer, indexerName string, nodeSelector labels.Selector) error {
return nodeInformer.AddIndexers(cache.Indexers{
indexerName: func(obj interface{}) ([]string, error) {
node, ok := obj.(*v1.Node)
if !ok {
return []string{}, errors.New("unexpected object")
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
return []string{indexerName}, nil
}
return []string{}, nil
},
})
}

View File

@@ -19,9 +19,12 @@ package node
import (
"context"
"errors"
"sort"
"strings"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -102,6 +105,183 @@ func TestReadyNodesWithNodeSelector(t *testing.T) {
}
}
func TestReadyNodesFromInterfaces(t *testing.T) {
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
node2.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}
node3 := test.BuildTestNode("node3", 1000, 2000, 9, nil)
tests := []struct {
description string
nodeInterfaces []interface{}
expectedCount int
expectedNames []string
expectError bool
errorContains string
}{
{
description: "All nodes are ready",
nodeInterfaces: []interface{}{node1, node3},
expectedCount: 2,
expectedNames: []string{"node1", "node3"},
expectError: false,
},
{
description: "One node is not ready",
nodeInterfaces: []interface{}{node1, node2, node3},
expectedCount: 2,
expectedNames: []string{"node1", "node3"},
expectError: false,
},
{
description: "Empty list",
nodeInterfaces: []interface{}{},
expectedCount: 0,
expectedNames: []string{},
expectError: false,
},
{
description: "Invalid type in list",
nodeInterfaces: []interface{}{node1, "not a node", node3},
expectedCount: 0,
expectError: true,
errorContains: "item at index 1 is not a *v1.Node",
},
}
for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
nodes, err := ReadyNodesFromInterfaces(tc.nodeInterfaces)
if tc.expectError {
if err == nil {
t.Errorf("Expected error but got none")
} else if tc.errorContains != "" && !strings.Contains(err.Error(), tc.errorContains) {
t.Errorf("Expected error to contain '%s', got '%s'", tc.errorContains, err.Error())
}
return
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(nodes) != tc.expectedCount {
t.Errorf("Expected %d nodes, got %d", tc.expectedCount, len(nodes))
}
for i, expectedName := range tc.expectedNames {
if i >= len(nodes) {
t.Errorf("Missing node at index %d, expected %s", i, expectedName)
continue
}
if nodes[i].Name != expectedName {
t.Errorf("Expected node at index %d to be %s, got %s", i, expectedName, nodes[i].Name)
}
}
})
}
}
func TestAddNodeSelectorIndexer(t *testing.T) {
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
node1.Labels = map[string]string{"type": "compute", "zone": "us-east-1"}
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
node2.Labels = map[string]string{"type": "infra", "zone": "us-west-1"}
node3 := test.BuildTestNode("node3", 1000, 2000, 9, nil)
node3.Labels = map[string]string{"type": "compute", "zone": "us-west-1"}
tests := []struct {
description string
indexerName string
selectorString string
expectedMatches []string
}{
{
description: "Index nodes by type=compute",
indexerName: "computeNodes",
selectorString: "type=compute",
expectedMatches: []string{"node1", "node3"},
},
{
description: "Index nodes by type=infra",
indexerName: "infraNodes",
selectorString: "type=infra",
expectedMatches: []string{"node2"},
},
{
description: "Index nodes by zone=us-west-1",
indexerName: "westZoneNodes",
selectorString: "zone=us-west-1",
expectedMatches: []string{"node2", "node3"},
},
{
description: "Index nodes with multiple labels",
indexerName: "computeEastNodes",
selectorString: "type=compute,zone=us-east-1",
expectedMatches: []string{"node1"},
},
{
description: "No matching nodes",
indexerName: "noMatchNodes",
selectorString: "type=storage",
expectedMatches: []string{},
},
}
for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset(node1, node2, node3)
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes().Informer()
selector, err := labels.Parse(tc.selectorString)
if err != nil {
t.Fatalf("Failed to parse selector: %v", err)
}
err = AddNodeSelectorIndexer(nodeInformer, tc.indexerName, selector)
if err != nil {
t.Fatalf("AddNodeSelectorIndexer failed: %v", err)
}
stopChannel := make(chan struct{})
sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)
defer close(stopChannel)
indexer := nodeInformer.GetIndexer()
objs, err := indexer.ByIndex(tc.indexerName, tc.indexerName)
if err != nil {
t.Errorf("Failed to query indexer: %v", err)
return
}
// Extract node names from the results
actualMatches := make([]string, 0, len(objs))
for _, obj := range objs {
node, ok := obj.(*v1.Node)
if !ok {
t.Errorf("Expected *v1.Node, got %T", obj)
continue
}
actualMatches = append(actualMatches, node.Name)
}
// Sort both slices for consistent comparison
sort.Strings(actualMatches)
expectedMatches := make([]string, len(tc.expectedMatches))
copy(expectedMatches, tc.expectedMatches)
sort.Strings(expectedMatches)
// Compare using cmp.Diff
if diff := cmp.Diff(expectedMatches, actualMatches); diff != "" {
t.Errorf("Node matches mismatch (-want +got):\n%s", diff)
}
})
}
}
func TestIsNodeUnschedulable(t *testing.T) {
tests := []struct {
description string

View File

@@ -23,6 +23,7 @@ type HandleImpl struct {
PodEvictorImpl *evictions.PodEvictor
MetricsCollectorImpl *metricscollector.MetricsCollector
PrometheusClientImpl promapi.Client
PluginInstanceIDImpl string
}
var _ frameworktypes.Handle = &HandleImpl{}
@@ -62,3 +63,7 @@ func (hi *HandleImpl) PreEvictionFilter(pod *v1.Pod) bool {
func (hi *HandleImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.EvictOptions) error {
return hi.PodEvictorImpl.EvictPod(ctx, pod, opts)
}
func (hi *HandleImpl) PluginInstanceID() string {
return hi.PluginInstanceIDImpl
}

View File

@@ -73,6 +73,22 @@ func NewPluginFncFromFake(fp *FakePlugin) pluginregistry.PluginBuilder {
}
}
func NewPluginFncFromFakeWithReactor(fp *FakePlugin, callback func(ActionImpl)) pluginregistry.PluginBuilder {
return func(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
fakePluginArgs, ok := args.(*FakePluginArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type FakePluginArgs, got %T", args)
}
fp.handle = handle
fp.args = fakePluginArgs
callback(ActionImpl{handle: fp.handle})
return fp, nil
}
}
// New builds plugin from its arguments while passing a handle
func New(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
fakePluginArgs, ok := args.(*FakePluginArgs)

View File

@@ -78,6 +78,14 @@ type handleImpl struct {
var _ frameworktypes.Handle = &handleImpl{}
// pluginHandle wraps a shared handleImpl and adds a plugin-specific instance ID
type pluginHandle struct {
*handleImpl
pluginInstanceID string
}
var _ frameworktypes.Handle = &pluginHandle{}
// ClientSet retrieves kube client set
func (hi *handleImpl) ClientSet() clientset.Interface {
return hi.clientSet
@@ -106,6 +114,17 @@ func (hi *handleImpl) Evictor() frameworktypes.Evictor {
return hi.evictor
}
// PluginInstanceID returns an empty string for the base handle.
// Plugins should receive a pluginHandle which has a specific instance ID.
func (hi *handleImpl) PluginInstanceID() string {
panic(fmt.Errorf("Not implemented"))
}
// PluginInstanceID returns a unique identifier for this plugin instance.
func (ph *pluginHandle) PluginInstanceID() string {
return ph.pluginInstanceID
}
type filterPlugin interface {
frameworktypes.Plugin
Filter(pod *v1.Pod) bool
@@ -142,6 +161,7 @@ type handleImplOpts struct {
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
podEvictor *evictions.PodEvictor
metricsCollector *metricscollector.MetricsCollector
profileInstanceID string
}
// WithClientSet sets clientSet for the scheduling frameworkImpl.
@@ -182,6 +202,14 @@ func WithMetricsCollector(metricsCollector *metricscollector.MetricsCollector) O
}
}
// WithProfileInstanceID sets the profile instance ID for the handle.
// This will be used to construct unique plugin instance IDs.
func WithProfileInstanceID(profileInstanceID string) Option {
return func(o *handleImplOpts) {
o.profileInstanceID = profileInstanceID
}
}
func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) {
for idx, pluginConfig := range pluginConfigs {
if pluginConfig.Name == pluginName {
@@ -191,7 +219,7 @@ func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.
return nil, 0
}
func buildPlugin(ctx context.Context, config api.DeschedulerProfile, pluginName string, handle *handleImpl, reg pluginregistry.Registry) (frameworktypes.Plugin, error) {
func buildPlugin(ctx context.Context, config api.DeschedulerProfile, pluginName string, handle frameworktypes.Handle, reg pluginregistry.Registry) (frameworktypes.Plugin, error) {
pc, _ := getPluginConfig(pluginName, config.PluginConfigs)
if pc == nil {
klog.ErrorS(fmt.Errorf("unable to get plugin config"), "skipping plugin", "plugin", pluginName, "profile", config.Name)
@@ -272,6 +300,7 @@ func NewProfile(ctx context.Context, config api.DeschedulerProfile, reg pluginre
return nil, fmt.Errorf("profile %q configures preEvictionFilter extension point of non-existing plugins: %v", config.Name, sets.New(config.Plugins.PreEvictionFilter.Enabled...).Difference(pi.preEvictionFilter))
}
// Create a base handle that will be used as a template for plugin-specific handles
handle := &handleImpl{
clientSet: hOpts.clientSet,
getPodsAssignedToNodeFunc: hOpts.getPodsAssignedToNodeFunc,
@@ -284,20 +313,26 @@ func NewProfile(ctx context.Context, config api.DeschedulerProfile, reg pluginre
prometheusClient: hOpts.prometheusClient,
}
// Collect all unique plugin names across all extension points
pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...)
pluginNames = append(pluginNames, config.Plugins.Filter.Enabled...)
pluginNames = append(pluginNames, config.Plugins.PreEvictionFilter.Enabled...)
// Build each unique plugin only once with a unique plugin instance ID
plugins := make(map[string]frameworktypes.Plugin)
for _, plugin := range sets.New(pluginNames...).UnsortedList() {
pg, err := buildPlugin(ctx, config, plugin, handle, reg)
for idx, pluginName := range sets.New(pluginNames...).UnsortedList() {
ph := &pluginHandle{
handleImpl: handle,
pluginInstanceID: fmt.Sprintf("%s-%d", hOpts.profileInstanceID, idx),
}
pg, err := buildPlugin(ctx, config, pluginName, ph, reg)
if err != nil {
return nil, fmt.Errorf("unable to build %v plugin: %v", plugin, err)
return nil, fmt.Errorf("unable to build %v plugin: %v", pluginName, err)
}
if pg == nil {
return nil, fmt.Errorf("got empty %v plugin build", plugin)
return nil, fmt.Errorf("got empty %v plugin build", pluginName)
}
plugins[plugin] = pg
plugins[pluginName] = pg
}
// Later, when a default list of plugins and their extension points is established,

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
@@ -542,3 +543,325 @@ func TestProfileExtensionPointOrdering(t *testing.T) {
t.Errorf("check for balance invocation order failed. Results are not deep equal. mismatch (-want +got):\n%s", diff)
}
}
// verifyInstanceIDsMatch verifies that instance IDs captured at creation, deschedule, and balance match
func verifyInstanceIDsMatch(t *testing.T, profileInstanceID string, pluginNames []string, creationIDs, descheduleIDs, balanceIDs map[string]string) {
for _, pluginName := range pluginNames {
creationID := creationIDs[pluginName]
descheduleID := descheduleIDs[pluginName]
balanceID := balanceIDs[pluginName]
if creationID == "" {
t.Errorf("Profile %s, plugin %s: plugin creation did not capture instance ID", profileInstanceID, pluginName)
}
if descheduleID == "" {
t.Errorf("Profile %s, plugin %s: deschedule extension point did not capture instance ID", profileInstanceID, pluginName)
}
if balanceID == "" {
t.Errorf("Profile %s, plugin %s: balance extension point did not capture instance ID", profileInstanceID, pluginName)
}
// Verify all IDs match
if creationID != descheduleID {
t.Errorf("Profile %s, plugin %s: instance ID mismatch - creation: %s, deschedule: %s", profileInstanceID, pluginName, creationID, descheduleID)
}
if creationID != balanceID {
t.Errorf("Profile %s, plugin %s: instance ID mismatch - creation: %s, balance: %s", profileInstanceID, pluginName, creationID, balanceID)
}
if descheduleID != balanceID {
t.Errorf("Profile %s, plugin %s: instance ID mismatch - deschedule: %s, balance: %s", profileInstanceID, pluginName, descheduleID, balanceID)
}
}
}
// verifyInstanceIDFormat verifies that instance IDs have correct format and sequential indices
func verifyInstanceIDFormat(t *testing.T, profileInstanceID string, pluginNames []string, pluginIDs map[string]string) sets.Set[string] {
if len(pluginIDs) != len(pluginNames) {
t.Errorf("Profile %s: expected %d plugins to be invoked, got %d", profileInstanceID, len(pluginNames), len(pluginIDs))
}
// Collect all instance IDs for this profile
profileInstanceIDs := sets.New[string]()
for pluginName, instanceID := range pluginIDs {
if instanceID == "" {
t.Errorf("Profile %s, plugin %s: expected instance ID to be set, got empty string", profileInstanceID, pluginName)
}
profileInstanceIDs.Insert(instanceID)
}
// Verify all IDs within this profile are unique
if profileInstanceIDs.Len() != len(pluginIDs) {
t.Errorf("Profile %s: duplicate instance IDs found", profileInstanceID)
}
// Verify all IDs match the expected format: "{profileInstanceID}-{index}"
// and contain sequential indices from 0 to n-1
expectedIndices := sets.New[int]()
for i := 0; i < len(pluginNames); i++ {
expectedIndices.Insert(i)
}
actualIndices := sets.New[int]()
for pluginName, instanceID := range pluginIDs {
var idx int
expectedPrefix := profileInstanceID + "-"
if !strings.HasPrefix(instanceID, expectedPrefix) {
t.Errorf("Profile %s, plugin %s: instance ID %s does not start with %s", profileInstanceID, pluginName, instanceID, expectedPrefix)
continue
}
_, err := fmt.Sscanf(instanceID, profileInstanceID+"-%d", &idx)
if err != nil {
t.Errorf("Profile %s, plugin %s: instance ID %s does not match expected format", profileInstanceID, pluginName, instanceID)
continue
}
actualIndices.Insert(idx)
}
// Verify we have indices 0 through n-1
diff := cmp.Diff(expectedIndices, actualIndices)
if diff != "" {
t.Errorf("Profile %s: instance ID indices mismatch (-want +got):\n%s", profileInstanceID, diff)
}
return profileInstanceIDs
}
func TestPluginInstanceIDs(t *testing.T) {
tests := []struct {
name string
profiles []struct {
profileInstanceID string
pluginNames []string
}
}{
{
name: "single plugin gets instance ID",
profiles: []struct {
profileInstanceID string
pluginNames []string
}{
{
profileInstanceID: "0",
pluginNames: []string{"TestPlugin"},
},
},
},
{
name: "two plugins get different instance IDs",
profiles: []struct {
profileInstanceID string
pluginNames []string
}{
{
profileInstanceID: "0",
pluginNames: []string{"Plugin_0", "Plugin_1"},
},
},
},
{
name: "three profiles with two plugins each get unique instance IDs",
profiles: []struct {
profileInstanceID string
pluginNames []string
}{
{
profileInstanceID: "0",
pluginNames: []string{"Plugin_A", "Plugin_B"},
},
{
profileInstanceID: "1",
pluginNames: []string{"Plugin_C", "Plugin_D"},
},
{
profileInstanceID: "2",
pluginNames: []string{"Plugin_E", "Plugin_F"},
},
},
},
{
name: "three profiles with same plugin names get different instance IDs per profile",
profiles: []struct {
profileInstanceID string
pluginNames []string
}{
{
profileInstanceID: "0",
pluginNames: []string{"CommonPlugin_X", "CommonPlugin_Y"},
},
{
profileInstanceID: "1",
pluginNames: []string{"CommonPlugin_X", "CommonPlugin_Y"},
},
{
profileInstanceID: "2",
pluginNames: []string{"CommonPlugin_X", "CommonPlugin_Y"},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
n1 := testutils.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := testutils.BuildTestNode("n2", 2000, 3000, 10, nil)
nodes := []*v1.Node{n1, n2}
// Track instance IDs by profile from different stages
profileDescheduleIDs := make(map[string]map[string]string) // profileInstanceID -> pluginName -> instanceID (from Deschedule execution)
profileBalanceIDs := make(map[string]map[string]string) // profileInstanceID -> pluginName -> instanceID (from Balance execution)
profileCreationIDs := make(map[string]map[string]string) // profileInstanceID -> pluginName -> instanceID (from plugin creation)
registry := pluginregistry.NewRegistry()
// Collect all distinct plugin names across all profiles
allPluginNames := sets.New[string]()
for _, profileCfg := range test.profiles {
allPluginNames.Insert(profileCfg.pluginNames...)
}
// Helper function to validate and store instance ID
captureInstanceID := func(instanceID, pluginName string, targetMap map[string]map[string]string) {
parts := strings.Split(instanceID, "-")
if len(parts) < 2 {
t.Fatalf("Plugin %s: instance ID %s does not have expected format 'profileID-index'", pluginName, instanceID)
}
profileID := parts[0]
if targetMap[profileID] == nil {
targetMap[profileID] = make(map[string]string)
}
targetMap[profileID][pluginName] = instanceID
}
// Register all plugins before creating profiles
for _, pluginName := range allPluginNames.UnsortedList() {
// Capture plugin name for closure
name := pluginName
pluginregistry.Register(
pluginName,
func(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
fakePlugin := &fakeplugin.FakePlugin{PluginName: name}
fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) {
if dAction, ok := action.(fakeplugin.DescheduleAction); ok {
captureInstanceID(dAction.Handle().PluginInstanceID(), name, profileDescheduleIDs)
return true, false, nil
}
return false, false, nil
})
fakePlugin.AddReactor(string(frameworktypes.BalanceExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) {
if bAction, ok := action.(fakeplugin.BalanceAction); ok {
captureInstanceID(bAction.Handle().PluginInstanceID(), name, profileBalanceIDs)
return true, false, nil
}
return false, false, nil
})
// Use NewPluginFncFromFakeWithReactor to wrap and capture instance ID at creation
builder := fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) {
captureInstanceID(action.Handle().PluginInstanceID(), name, profileCreationIDs)
})
return builder(ctx, args, handle)
},
&fakeplugin.FakePlugin{},
&fakeplugin.FakePluginArgs{},
fakeplugin.ValidateFakePluginArgs,
fakeplugin.SetDefaults_FakePluginArgs,
registry,
)
}
client := fakeclientset.NewSimpleClientset(n1, n2)
handle, podEvictor, err := frameworktesting.InitFrameworkHandle(
ctx,
client,
nil,
defaultevictor.DefaultEvictorArgs{},
nil,
)
if err != nil {
t.Fatalf("Unable to initialize a framework handle: %v", err)
}
// Create all profiles
var profiles []*profileImpl
for _, profileCfg := range test.profiles {
var pluginConfigs []api.PluginConfig
for _, pluginName := range profileCfg.pluginNames {
pluginConfigs = append(pluginConfigs, api.PluginConfig{
Name: pluginName,
Args: &fakeplugin.FakePluginArgs{},
})
}
prfl, err := NewProfile(
ctx,
api.DeschedulerProfile{
Name: "test-profile",
PluginConfigs: pluginConfigs,
Plugins: api.Plugins{
Deschedule: api.PluginSet{
Enabled: profileCfg.pluginNames,
},
Balance: api.PluginSet{
Enabled: profileCfg.pluginNames,
},
},
},
registry,
WithClientSet(client),
WithSharedInformerFactory(handle.SharedInformerFactoryImpl),
WithPodEvictor(podEvictor),
WithGetPodsAssignedToNodeFnc(handle.GetPodsAssignedToNodeFuncImpl),
WithProfileInstanceID(profileCfg.profileInstanceID),
)
if err != nil {
t.Fatalf("unable to create profile: %v", err)
}
profiles = append(profiles, prfl)
}
// Run deschedule and balance plugins for all profiles
for _, prfl := range profiles {
prfl.RunDeschedulePlugins(ctx, nodes)
prfl.RunBalancePlugins(ctx, nodes)
}
// Verify creation, deschedule, and balance IDs all match
for _, profileCfg := range test.profiles {
verifyInstanceIDsMatch(
t,
profileCfg.profileInstanceID,
profileCfg.pluginNames,
profileCreationIDs[profileCfg.profileInstanceID],
profileDescheduleIDs[profileCfg.profileInstanceID],
profileBalanceIDs[profileCfg.profileInstanceID],
)
}
// Verify all plugins were invoked and have correct instance IDs
allInstanceIDs := sets.New[string]()
for _, profileCfg := range test.profiles {
profileInstanceIDs := verifyInstanceIDFormat(
t,
profileCfg.profileInstanceID,
profileCfg.pluginNames,
profileDescheduleIDs[profileCfg.profileInstanceID],
)
allInstanceIDs = allInstanceIDs.Union(profileInstanceIDs)
}
// Verify all instance IDs are unique across all profiles
totalExpectedPlugins := 0
for _, profileCfg := range test.profiles {
totalExpectedPlugins += len(profileCfg.pluginNames)
}
if allInstanceIDs.Len() != totalExpectedPlugins {
t.Errorf("Expected %d unique instance IDs across all profiles, got %d", totalExpectedPlugins, allInstanceIDs.Len())
}
})
}
}

View File

@@ -41,6 +41,9 @@ type Handle interface {
GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc
SharedInformerFactory() informers.SharedInformerFactory
MetricsCollector() *metricscollector.MetricsCollector
// PluginInstanceID returns a unique identifier for this plugin instance.
// The ID is unique across all plugin instances in a configuration.
PluginInstanceID() string
}
// Evictor defines an interface for filtering and evicting pods

View File

@@ -43,6 +43,10 @@ import (
// BuildTestPod creates a test pod with given parameters.
func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v1.Pod)) *v1.Pod {
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: name,
@@ -76,6 +80,10 @@ func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v
func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget {
maxUnavailable := intstr.FromInt32(1)
pdb := &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
APIVersion: "policy/v1",
Kind: "PodDisruptionBudget",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: name,
@@ -94,6 +102,10 @@ func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget {
func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: name,
@@ -114,6 +126,10 @@ func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim {
// BuildPodMetrics creates a test podmetrics with given parameters.
func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics {
return &v1beta1.PodMetrics{
TypeMeta: metav1.TypeMeta{
APIVersion: "metrics.k8s.io/v1beta1",
Kind: "PodMetrics",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
@@ -171,6 +187,10 @@ func GetDaemonSetOwnerRefList() []metav1.OwnerReference {
// BuildTestNode creates a node with specified capacity.
func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) *v1.Node {
node := &v1.Node{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Node",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
SelfLink: fmt.Sprintf("/api/v1/nodes/%s", name),
@@ -201,6 +221,10 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node))
func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics {
return &v1beta1.NodeMetrics{
TypeMeta: metav1.TypeMeta{
APIVersion: "metrics.k8s.io/v1beta1",
Kind: "NodeMetrics",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},