mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 13:29:11 +01:00
Compare commits
23 Commits
7d2c31cd39
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b4fd3544c | ||
|
|
fe2fb603f4 | ||
|
|
a500ff9c64 | ||
|
|
263db33052 | ||
|
|
45dc5a20d3 | ||
|
|
f520856095 | ||
|
|
e53b3d5dce | ||
|
|
72d61286eb | ||
|
|
770ec5affa | ||
|
|
38d99dd0c3 | ||
|
|
8f5a83279e | ||
|
|
4daa7e2fbf | ||
|
|
433f0dbb8c | ||
|
|
cc96a3ee7a | ||
|
|
ff580a0eff | ||
|
|
4af097a806 | ||
|
|
b3f0184af8 | ||
|
|
881ead3ed2 | ||
|
|
fc6d0d1132 | ||
|
|
85b1d97dda | ||
|
|
b6aadc1643 | ||
|
|
c4ec31684f | ||
|
|
93a516a58a |
@@ -30,14 +30,9 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
policyv1 "k8s.io/api/policy/v1"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilversion "k8s.io/apimachinery/pkg/util/version"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@@ -47,7 +42,6 @@ import (
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@@ -74,6 +68,7 @@ import (
|
||||
const (
|
||||
prometheusAuthTokenSecretKey = "prometheusAuthToken"
|
||||
workQueueKey = "key"
|
||||
indexerNodeSelectorGlobal = "indexer_node_selector_global"
|
||||
)
|
||||
|
||||
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
|
||||
@@ -85,14 +80,14 @@ type profileRunner struct {
|
||||
|
||||
type descheduler struct {
|
||||
rs *options.DeschedulerServer
|
||||
ir *informerResources
|
||||
client clientset.Interface
|
||||
kubeClientSandbox *kubeClientSandbox
|
||||
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
namespacedSecretsLister corev1listers.SecretNamespaceLister
|
||||
deschedulerPolicy *api.DeschedulerPolicy
|
||||
eventRecorder events.EventRecorder
|
||||
podEvictor *evictions.PodEvictor
|
||||
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
|
||||
metricsCollector *metricscollector.MetricsCollector
|
||||
prometheusClient promapi.Client
|
||||
previousPrometheusClientTransport *http.Transport
|
||||
@@ -101,48 +96,20 @@ type descheduler struct {
|
||||
metricsProviders map[api.MetricsSource]*api.MetricsProvider
|
||||
}
|
||||
|
||||
type informerResources struct {
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
|
||||
func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) {
|
||||
nodeSelector := labels.Everything()
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeSelector = sel
|
||||
}
|
||||
return nodeSelector, nil
|
||||
}
|
||||
|
||||
func newInformerResources(sharedInformerFactory informers.SharedInformerFactory) *informerResources {
|
||||
return &informerResources{
|
||||
sharedInformerFactory: sharedInformerFactory,
|
||||
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
|
||||
}
|
||||
}
|
||||
|
||||
func (ir *informerResources) Uses(resources ...schema.GroupVersionResource) error {
|
||||
for _, resource := range resources {
|
||||
informer, err := ir.sharedInformerFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ir.resourceToInformer[resource] = informer
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CopyTo Copy informer subscriptions to the new factory and objects to the fake client so that the backing caches are populated for when listers are used.
|
||||
func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFactory informers.SharedInformerFactory) error {
|
||||
for resource, informer := range ir.resourceToInformer {
|
||||
_, err := newFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting resource %s: %w", resource, err)
|
||||
}
|
||||
|
||||
objects, err := informer.Lister().List(labels.Everything())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing %s: %w", informer, err)
|
||||
}
|
||||
|
||||
for _, object := range objects {
|
||||
fakeClient.Tracker().Add(object)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
func addNodeSelectorIndexer(sharedInformerFactory informers.SharedInformerFactory, nodeSelector labels.Selector) error {
|
||||
return nodeutil.AddNodeSelectorIndexer(sharedInformerFactory.Core().V1().Nodes().Informer(), indexerNodeSelectorGlobal, nodeSelector)
|
||||
}
|
||||
|
||||
func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider {
|
||||
@@ -153,19 +120,28 @@ func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.Metri
|
||||
return providersMap
|
||||
}
|
||||
|
||||
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
|
||||
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
|
||||
// setupPrometheusProvider sets up the prometheus provider on the descheduler if configured
|
||||
func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory informers.SharedInformerFactory) error {
|
||||
prometheusProvider := d.metricsProviders[api.PrometheusMetrics]
|
||||
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil {
|
||||
authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference
|
||||
if authTokenSecret == nil || authTokenSecret.Namespace == "" {
|
||||
return fmt.Errorf("prometheus metrics source configuration is missing authentication token secret")
|
||||
}
|
||||
if namespacedSharedInformerFactory == nil {
|
||||
return fmt.Errorf("namespacedSharedInformerFactory not configured")
|
||||
}
|
||||
namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(d.eventHandler())
|
||||
d.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
ir := newInformerResources(sharedInformerFactory)
|
||||
ir.Uses(v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
// Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay
|
||||
// consistent with the real runs without having to keep the list here in sync.
|
||||
v1.SchemeGroupVersion.WithResource("namespaces"), // Used by the defaultevictor plugin
|
||||
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"), // Used by the defaultevictor plugin
|
||||
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), // Used by the defaultevictor plugin
|
||||
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"), // Used by the defaultevictor plugin
|
||||
) // Used by the defaultevictor plugin
|
||||
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, kubeClientSandbox *kubeClientSandbox) (*descheduler, error) {
|
||||
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
|
||||
// Temporarily register the PVC because it is used by the DefaultEvictor plugin during
|
||||
// the descheduling cycle, where informer registration is ignored.
|
||||
_ = sharedInformerFactory.Core().V1().PersistentVolumeClaims().Informer()
|
||||
|
||||
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
|
||||
if err != nil {
|
||||
@@ -174,7 +150,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
|
||||
|
||||
podEvictor, err := evictions.NewPodEvictor(
|
||||
ctx,
|
||||
rs.Client,
|
||||
client,
|
||||
eventRecorder,
|
||||
podInformer,
|
||||
rs.DefaultFeatureGates,
|
||||
@@ -193,44 +169,32 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
|
||||
}
|
||||
|
||||
desch := &descheduler{
|
||||
rs: rs,
|
||||
ir: ir,
|
||||
getPodsAssignedToNode: getPodsAssignedToNode,
|
||||
sharedInformerFactory: sharedInformerFactory,
|
||||
deschedulerPolicy: deschedulerPolicy,
|
||||
eventRecorder: eventRecorder,
|
||||
podEvictor: podEvictor,
|
||||
podEvictionReactionFnc: podEvictionReactionFnc,
|
||||
prometheusClient: rs.PrometheusClient,
|
||||
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}),
|
||||
metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders),
|
||||
rs: rs,
|
||||
client: client,
|
||||
kubeClientSandbox: kubeClientSandbox,
|
||||
getPodsAssignedToNode: getPodsAssignedToNode,
|
||||
sharedInformerFactory: sharedInformerFactory,
|
||||
deschedulerPolicy: deschedulerPolicy,
|
||||
eventRecorder: eventRecorder,
|
||||
podEvictor: podEvictor,
|
||||
prometheusClient: rs.PrometheusClient,
|
||||
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}),
|
||||
metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders),
|
||||
}
|
||||
|
||||
nodeSelector, err := nodeSelectorFromPolicy(deschedulerPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := addNodeSelectorIndexer(sharedInformerFactory, nodeSelector); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if rs.MetricsClient != nil {
|
||||
nodeSelector := labels.Everything()
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeSelector = sel
|
||||
}
|
||||
desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
|
||||
}
|
||||
|
||||
prometheusProvider := desch.metricsProviders[api.PrometheusMetrics]
|
||||
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil {
|
||||
authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference
|
||||
if authTokenSecret == nil || authTokenSecret.Namespace == "" {
|
||||
return nil, fmt.Errorf("prometheus metrics source configuration is missing authentication token secret")
|
||||
}
|
||||
if namespacedSharedInformerFactory == nil {
|
||||
return nil, fmt.Errorf("namespacedSharedInformerFactory not configured")
|
||||
}
|
||||
namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(desch.eventHandler())
|
||||
desch.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace)
|
||||
}
|
||||
|
||||
return desch, nil
|
||||
}
|
||||
|
||||
@@ -345,7 +309,7 @@ func (d *descheduler) eventHandler() cache.ResourceEventHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
|
||||
func (d *descheduler) runDeschedulerLoop(ctx context.Context) error {
|
||||
var span trace.Span
|
||||
ctx, span = tracing.Tracer().Start(ctx, "runDeschedulerLoop")
|
||||
defer span.End()
|
||||
@@ -354,52 +318,22 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
|
||||
metrics.LoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
|
||||
}(time.Now())
|
||||
|
||||
// if len is still <= 1 error out
|
||||
if len(nodes) <= 1 {
|
||||
klog.InfoS("Skipping descheduling cycle: requires >=2 nodes", "found", len(nodes))
|
||||
return nil // gracefully skip this cycle instead of aborting
|
||||
}
|
||||
|
||||
var client clientset.Interface
|
||||
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
|
||||
// So when evicting pods while running multiple strategies in a row have the cummulative effect
|
||||
// as is when evicting pods for real.
|
||||
if d.rs.DryRun {
|
||||
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
|
||||
// Create a new cache so we start from scratch without any leftovers
|
||||
fakeClient := fakeclientset.NewSimpleClientset()
|
||||
// simulate a pod eviction by deleting a pod
|
||||
fakeClient.PrependReactor("create", "pods", d.podEvictionReactionFnc(fakeClient))
|
||||
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
|
||||
err := d.ir.CopyTo(fakeClient, fakeSharedInformerFactory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create a new instance of the shared informer factor from the cached client
|
||||
// register the pod informer, otherwise it will not get running
|
||||
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
|
||||
if err != nil {
|
||||
return fmt.Errorf("build get pods assigned to node function error: %v", err)
|
||||
}
|
||||
|
||||
fakeCtx, cncl := context.WithCancel(context.TODO())
|
||||
defer cncl()
|
||||
fakeSharedInformerFactory.Start(fakeCtx.Done())
|
||||
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
|
||||
|
||||
client = fakeClient
|
||||
d.sharedInformerFactory = fakeSharedInformerFactory
|
||||
} else {
|
||||
client = d.rs.Client
|
||||
}
|
||||
|
||||
klog.V(3).Infof("Setting up the pod evictor")
|
||||
d.podEvictor.SetClient(client)
|
||||
klog.V(3).Infof("Resetting pod evictor counters")
|
||||
d.podEvictor.ResetCounters()
|
||||
|
||||
d.runProfiles(ctx, client, nodes)
|
||||
d.runProfiles(ctx)
|
||||
|
||||
if d.rs.DryRun {
|
||||
if d.kubeClientSandbox == nil {
|
||||
return fmt.Errorf("kubeClientSandbox is nil in DryRun mode")
|
||||
}
|
||||
klog.V(3).Infof("Restoring evicted pods from cache")
|
||||
if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
|
||||
klog.ErrorS(err, "Failed to restore evicted pods")
|
||||
return fmt.Errorf("failed to restore evicted pods: %w", err)
|
||||
}
|
||||
d.kubeClientSandbox.reset()
|
||||
}
|
||||
|
||||
klog.V(1).InfoS("Number of evictions/requests", "totalEvicted", d.podEvictor.TotalEvicted(), "evictionRequests", d.podEvictor.TotalEvictionRequests())
|
||||
|
||||
@@ -409,17 +343,38 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
|
||||
// runProfiles runs all the deschedule plugins of all profiles and
|
||||
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
|
||||
// see https://github.com/kubernetes-sigs/descheduler/issues/979
|
||||
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node) {
|
||||
func (d *descheduler) runProfiles(ctx context.Context) {
|
||||
var span trace.Span
|
||||
ctx, span = tracing.Tracer().Start(ctx, "runProfiles")
|
||||
defer span.End()
|
||||
|
||||
nodesAsInterface, err := d.sharedInformerFactory.Core().V1().Nodes().Informer().GetIndexer().ByIndex(indexerNodeSelectorGlobal, indexerNodeSelectorGlobal)
|
||||
if err != nil {
|
||||
span.AddEvent("Failed to list nodes with global node selector", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
nodes, err := nodeutil.ReadyNodesFromInterfaces(nodesAsInterface)
|
||||
if err != nil {
|
||||
span.AddEvent("Failed to convert node as interfaces into ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// if len is still <= 1 error out
|
||||
if len(nodes) <= 1 {
|
||||
klog.InfoS("Skipping descheduling cycle: requires >=2 nodes", "found", len(nodes))
|
||||
return // gracefully skip this cycle instead of aborting
|
||||
}
|
||||
|
||||
var profileRunners []profileRunner
|
||||
for idx, profile := range d.deschedulerPolicy.Profiles {
|
||||
currProfile, err := frameworkprofile.NewProfile(
|
||||
ctx,
|
||||
profile,
|
||||
pluginregistry.PluginRegistry,
|
||||
frameworkprofile.WithClientSet(client),
|
||||
frameworkprofile.WithClientSet(d.client),
|
||||
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
|
||||
frameworkprofile.WithPodEvictor(d.podEvictor),
|
||||
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
|
||||
@@ -554,27 +509,6 @@ func validateVersionCompatibility(discovery discovery.DiscoveryInterface, desche
|
||||
return nil
|
||||
}
|
||||
|
||||
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return func(action core.Action) (bool, runtime.Object, error) {
|
||||
if action.GetSubresource() == "eviction" {
|
||||
createAct, matched := action.(core.CreateActionImpl)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
|
||||
}
|
||||
eviction, matched := createAct.Object.(*policy.Eviction)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
|
||||
}
|
||||
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
|
||||
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
||||
}
|
||||
return true, nil, nil
|
||||
}
|
||||
// fallback to the default reactor
|
||||
return false, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
type tokenReconciliation int
|
||||
|
||||
const (
|
||||
@@ -590,11 +524,6 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
|
||||
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
|
||||
|
||||
var nodeSelector string
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
nodeSelector = *deschedulerPolicy.NodeSelector
|
||||
}
|
||||
|
||||
var eventClient clientset.Interface
|
||||
if rs.DryRun {
|
||||
eventClient = fakeclientset.NewSimpleClientset()
|
||||
@@ -619,7 +548,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
}
|
||||
}
|
||||
|
||||
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory, namespacedSharedInformerFactory)
|
||||
// Always create descheduler with real client/factory first to register all informers
|
||||
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, rs.Client, sharedInformerFactory, nil)
|
||||
if err != nil {
|
||||
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
return err
|
||||
@@ -627,17 +557,52 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Setup Prometheus provider (only for real client case, not for dry run)
|
||||
if err := setupPrometheusProvider(descheduler, namespacedSharedInformerFactory); err != nil {
|
||||
span.AddEvent("Failed to setup Prometheus provider", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
return err
|
||||
}
|
||||
|
||||
// If in dry run mode, replace the descheduler with one using fake client/factory
|
||||
if rs.DryRun {
|
||||
// Create sandbox with resources to mirror from real client
|
||||
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
|
||||
if err != nil {
|
||||
span.AddEvent("Failed to create kube client sandbox", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
return fmt.Errorf("failed to create kube client sandbox: %v", err)
|
||||
}
|
||||
|
||||
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
|
||||
|
||||
// TODO(ingvagabund): drop the previous queue
|
||||
// TODO(ingvagabund): stop the previous pod evictor
|
||||
// Replace descheduler with one using fake client/factory
|
||||
descheduler, err = newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
|
||||
if err != nil {
|
||||
span.AddEvent("Failed to create dry run descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// In dry run mode, start and sync the fake shared informer factory so it can mirror
|
||||
// events from the real factory. Reliable propagation depends on both factories being
|
||||
// fully synced (see WaitForCacheSync calls below), not solely on startup order.
|
||||
if rs.DryRun {
|
||||
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
|
||||
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
|
||||
}
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
if metricProviderTokenReconciliation == secretReconciliation {
|
||||
namespacedSharedInformerFactory.Start(ctx.Done())
|
||||
}
|
||||
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
|
||||
if metricProviderTokenReconciliation == secretReconciliation {
|
||||
namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
}
|
||||
|
||||
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
|
||||
|
||||
if descheduler.metricsCollector != nil {
|
||||
go func() {
|
||||
klog.V(2).Infof("Starting metrics collector")
|
||||
@@ -669,14 +634,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
|
||||
defer sSpan.End()
|
||||
|
||||
nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.sharedInformerFactory.Core().V1().Nodes().Lister(), nodeSelector)
|
||||
if err != nil {
|
||||
sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
err = descheduler.runDeschedulerLoop(sCtx, nodes)
|
||||
err = descheduler.runDeschedulerLoop(sCtx)
|
||||
if err != nil {
|
||||
sSpan.AddEvent("Failed to run descheduler loop", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
|
||||
@@ -6,20 +6,27 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
promapi "github.com/prometheus/client_golang/api"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
apiversion "k8s.io/apimachinery/pkg/version"
|
||||
fakediscovery "k8s.io/client-go/discovery/fake"
|
||||
"k8s.io/client-go/informers"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
||||
@@ -31,16 +38,32 @@ import (
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
"sigs.k8s.io/descheduler/pkg/features"
|
||||
fakeplugin "sigs.k8s.io/descheduler/pkg/framework/fake/plugin"
|
||||
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
|
||||
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
|
||||
"sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization"
|
||||
"sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates"
|
||||
"sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints"
|
||||
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
|
||||
"sigs.k8s.io/descheduler/pkg/utils"
|
||||
deschedulerversion "sigs.k8s.io/descheduler/pkg/version"
|
||||
"sigs.k8s.io/descheduler/test"
|
||||
)
|
||||
|
||||
type mockPrometheusClient struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (m *mockPrometheusClient) URL(ep string, args map[string]string) *url.URL {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
var _ promapi.Client = &mockPrometheusClient{}
|
||||
|
||||
var (
|
||||
podEvictionError = errors.New("PodEvictionError")
|
||||
tooManyRequestsError = &apierrors.StatusError{
|
||||
@@ -177,7 +200,7 @@ func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThreshold
|
||||
}
|
||||
}
|
||||
|
||||
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
|
||||
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, dryRun bool, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
|
||||
client := fakeclientset.NewSimpleClientset(objects...)
|
||||
eventClient := fakeclientset.NewSimpleClientset(objects...)
|
||||
|
||||
@@ -189,19 +212,73 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate
|
||||
rs.EventClient = eventClient
|
||||
rs.DefaultFeatureGates = featureGates
|
||||
rs.MetricsClient = metricsClient
|
||||
rs.DryRun = dryRun
|
||||
|
||||
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
|
||||
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client)
|
||||
|
||||
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory, nil)
|
||||
// Always create descheduler with real client/factory first to register all informers
|
||||
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil)
|
||||
if err != nil {
|
||||
eventBroadcaster.Shutdown()
|
||||
t.Fatalf("Unable to create a descheduler instance: %v", err)
|
||||
t.Fatalf("Unable to create descheduler instance: %v", err)
|
||||
}
|
||||
|
||||
// Setup Prometheus provider (only for real client case, not for dry run)
|
||||
if err := setupPrometheusProvider(descheduler, nil); err != nil {
|
||||
eventBroadcaster.Shutdown()
|
||||
t.Fatalf("Failed to setup Prometheus provider: %v", err)
|
||||
}
|
||||
|
||||
// If in dry run mode, replace the descheduler with one using fake client/factory
|
||||
if dryRun {
|
||||
// Create sandbox with resources to mirror from real client
|
||||
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
|
||||
if err != nil {
|
||||
eventBroadcaster.Shutdown()
|
||||
t.Fatalf("Failed to create kube client sandbox: %v", err)
|
||||
}
|
||||
|
||||
// Replace descheduler with one using fake client/factory
|
||||
descheduler, err = newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
|
||||
if err != nil {
|
||||
eventBroadcaster.Shutdown()
|
||||
t.Fatalf("Unable to create dry run descheduler instance: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the real shared informer factory after creating the descheduler
|
||||
if dryRun {
|
||||
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
|
||||
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
|
||||
}
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
if dryRun {
|
||||
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
for _, obj := range objects {
|
||||
exists, err := descheduler.kubeClientSandbox.hasRuntimeObjectInIndexer(obj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
metaObj, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get object metadata: %w", err)
|
||||
}
|
||||
key := cache.MetaObjectToName(metaObj).String()
|
||||
if !exists {
|
||||
klog.Infof("Object %q has not propagated to the indexer", key)
|
||||
return false, nil
|
||||
}
|
||||
klog.Infof("Object %q has propagated to the indexer", key)
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatalf("nodes did not propagate to the indexer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return rs, descheduler, client
|
||||
}
|
||||
|
||||
@@ -477,71 +554,92 @@ func taintNodeNoSchedule(node *v1.Node) {
|
||||
func TestPodEvictorReset(t *testing.T) {
|
||||
initPluginRegistry()
|
||||
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
nodes := []*v1.Node{node1, node2}
|
||||
|
||||
ownerRef1 := test.GetReplicaSetOwnerRefList()
|
||||
updatePod := func(pod *v1.Pod) {
|
||||
pod.Namespace = "dev"
|
||||
pod.ObjectMeta.OwnerReferences = ownerRef1
|
||||
tests := []struct {
|
||||
name string
|
||||
dryRun bool
|
||||
cycles []struct {
|
||||
expectedTotalEvicted uint
|
||||
expectedRealEvictions int
|
||||
expectedFakeEvictions int
|
||||
}
|
||||
}{
|
||||
{
|
||||
name: "real mode",
|
||||
dryRun: false,
|
||||
cycles: []struct {
|
||||
expectedTotalEvicted uint
|
||||
expectedRealEvictions int
|
||||
expectedFakeEvictions int
|
||||
}{
|
||||
{expectedTotalEvicted: 2, expectedRealEvictions: 2, expectedFakeEvictions: 0},
|
||||
{expectedTotalEvicted: 2, expectedRealEvictions: 4, expectedFakeEvictions: 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "dry mode",
|
||||
dryRun: true,
|
||||
cycles: []struct {
|
||||
expectedTotalEvicted uint
|
||||
expectedRealEvictions int
|
||||
expectedFakeEvictions int
|
||||
}{
|
||||
{expectedTotalEvicted: 2, expectedRealEvictions: 0, expectedFakeEvictions: 2},
|
||||
{expectedTotalEvicted: 2, expectedRealEvictions: 0, expectedFakeEvictions: 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, updatePod)
|
||||
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, updatePod)
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
|
||||
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
rs, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, node1, node2, p1, p2)
|
||||
defer cancel()
|
||||
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
|
||||
var evictedPods []string
|
||||
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil))
|
||||
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
_, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, tc.dryRun, node1, node2, p1, p2)
|
||||
defer cancel()
|
||||
|
||||
var fakeEvictedPods []string
|
||||
descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, nil)
|
||||
var evictedPods []string
|
||||
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil))
|
||||
|
||||
var fakeEvictedPods []string
|
||||
for i, cycle := range tc.cycles {
|
||||
evictedPodNames := runDeschedulerLoopAndGetEvictedPods(ctx, t, descheduler, tc.dryRun)
|
||||
fakeEvictedPods = append(fakeEvictedPods, evictedPodNames...)
|
||||
|
||||
if descheduler.podEvictor.TotalEvicted() != cycle.expectedTotalEvicted || len(evictedPods) != cycle.expectedRealEvictions || len(fakeEvictedPods) != cycle.expectedFakeEvictions {
|
||||
t.Fatalf("Cycle %d: Expected (%v,%v,%v) pods evicted, got (%v,%v,%v) instead", i+1, cycle.expectedTotalEvicted, cycle.expectedRealEvictions, cycle.expectedFakeEvictions, descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// runDeschedulerLoopAndGetEvictedPods runs a descheduling cycle and returns the names of evicted pods.
|
||||
// This is similar to runDeschedulerLoop but captures evicted pod names before the cache is reset.
|
||||
func runDeschedulerLoopAndGetEvictedPods(ctx context.Context, t *testing.T, d *descheduler, dryRun bool) []string {
|
||||
d.podEvictor.ResetCounters()
|
||||
|
||||
d.runProfiles(ctx)
|
||||
|
||||
var evictedPodNames []string
|
||||
if dryRun {
|
||||
evictedPodsFromCache := d.kubeClientSandbox.evictedPodsCache.list()
|
||||
for _, pod := range evictedPodsFromCache {
|
||||
evictedPodNames = append(evictedPodNames, pod.Name)
|
||||
}
|
||||
|
||||
if err := d.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
|
||||
t.Fatalf("Failed to restore evicted pods: %v", err)
|
||||
}
|
||||
d.kubeClientSandbox.reset()
|
||||
}
|
||||
|
||||
// a single pod eviction expected
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 2 real evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 2 || len(fakeEvictedPods) != 0 {
|
||||
t.Fatalf("Expected (2,2,0) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
|
||||
}
|
||||
|
||||
// a single pod eviction expected
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 4 real evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 4 || len(fakeEvictedPods) != 0 {
|
||||
t.Fatalf("Expected (2,4,0) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
|
||||
}
|
||||
|
||||
// check the fake client syncing and the right pods evicted
|
||||
klog.Infof("Enabling the dry run mode")
|
||||
rs.DryRun = true
|
||||
evictedPods = []string{}
|
||||
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 2 fake evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 0 || len(fakeEvictedPods) != 2 {
|
||||
t.Fatalf("Expected (2,0,2) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
|
||||
}
|
||||
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 4 fake evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 0 || len(fakeEvictedPods) != 4 {
|
||||
t.Fatalf("Expected (2,0,4) pods evicted, got (%v, %v, %v) instead", descheduler.podEvictor.TotalEvicted(), len(evictedPods), len(fakeEvictedPods))
|
||||
}
|
||||
return evictedPodNames
|
||||
}
|
||||
|
||||
func checkTotals(t *testing.T, ctx context.Context, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) {
|
||||
@@ -555,7 +653,7 @@ func checkTotals(t *testing.T, ctx context.Context, descheduler *descheduler, to
|
||||
}
|
||||
|
||||
func runDeschedulingCycleAndCheckTotals(t *testing.T, ctx context.Context, nodes []*v1.Node, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) {
|
||||
err := descheduler.runDeschedulerLoop(ctx, nodes)
|
||||
err := descheduler.runDeschedulerLoop(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
@@ -595,14 +693,9 @@ func TestEvictionRequestsCache(t *testing.T) {
|
||||
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
|
||||
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
|
||||
})
|
||||
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, node1, node2, p1, p2, p3, p4)
|
||||
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, false, node1, node2, p1, p2, p3, p4)
|
||||
defer cancel()
|
||||
|
||||
var fakeEvictedPods []string
|
||||
descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, podEvictionError)
|
||||
}
|
||||
|
||||
var evictedPods []string
|
||||
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, func(name string) bool { return name == "p1" || name == "p2" }, nil))
|
||||
|
||||
@@ -731,20 +824,14 @@ func TestDeschedulingLimits(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
nodes := []*v1.Node{node1, node2}
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
featureGates := featuregate.NewFeatureGate()
|
||||
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
|
||||
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
|
||||
})
|
||||
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, node1, node2)
|
||||
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, false, node1, node2)
|
||||
defer cancel()
|
||||
|
||||
var fakeEvictedPods []string
|
||||
descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, podEvictionError)
|
||||
}
|
||||
|
||||
var evictedPods []string
|
||||
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, func(name string) bool { return name == "p1" || name == "p2" }, nil))
|
||||
|
||||
@@ -774,7 +861,7 @@ func TestDeschedulingLimits(t *testing.T) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
klog.Infof("2 evictions in background expected, 2 normal evictions")
|
||||
err := descheduler.runDeschedulerLoop(ctx, nodes)
|
||||
err := descheduler.runDeschedulerLoop(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
@@ -790,6 +877,217 @@ func TestDeschedulingLimits(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeLabelSelectorBasedEviction(t *testing.T) {
|
||||
initPluginRegistry()
|
||||
|
||||
// createNodes creates 4 nodes with different labels and applies a taint to all of them
|
||||
createNodes := func() (*v1.Node, *v1.Node, *v1.Node, *v1.Node) {
|
||||
taint := []v1.Taint{
|
||||
{
|
||||
Key: "test-taint",
|
||||
Value: "test-value",
|
||||
Effect: v1.TaintEffectNoSchedule,
|
||||
},
|
||||
}
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-east-1a",
|
||||
"node-type": "compute",
|
||||
"environment": "production",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-east-1b",
|
||||
"node-type": "compute",
|
||||
"environment": "production",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
node3 := test.BuildTestNode("n3", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-west-1a",
|
||||
"node-type": "storage",
|
||||
"environment": "staging",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
node4 := test.BuildTestNode("n4", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-west-1b",
|
||||
"node-type": "storage",
|
||||
"environment": "staging",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
return node1, node2, node3, node4
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
nodeSelector string
|
||||
dryRun bool
|
||||
expectedEvictedFromNodes []string
|
||||
}{
|
||||
{
|
||||
description: "Evict from n1, n2",
|
||||
nodeSelector: "environment=production",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n1, n2 in dry run mode",
|
||||
nodeSelector: "environment=production",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n3, n4",
|
||||
nodeSelector: "environment=staging",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n3", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n3, n4 in dry run mode",
|
||||
nodeSelector: "environment=staging",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n3", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n1, n4",
|
||||
nodeSelector: "zone in (us-east-1a, us-west-1b)",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n1", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n1, n4 in dry run mode",
|
||||
nodeSelector: "zone in (us-east-1a, us-west-1b)",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n1", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n2, n3",
|
||||
nodeSelector: "zone in (us-east-1b, us-west-1a)",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n2", "n3"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n2, n3 in dry run mode",
|
||||
nodeSelector: "zone in (us-east-1b, us-west-1a)",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n2", "n3"},
|
||||
},
|
||||
{
|
||||
description: "Evict from all nodes",
|
||||
nodeSelector: "",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2", "n3", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from all nodes in dry run mode",
|
||||
nodeSelector: "",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2", "n3", "n4"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create nodes with different labels and taints
|
||||
node1, node2, node3, node4 := createNodes()
|
||||
|
||||
ownerRef := test.GetReplicaSetOwnerRefList()
|
||||
updatePod := func(pod *v1.Pod) {
|
||||
pod.ObjectMeta.OwnerReferences = ownerRef
|
||||
pod.Status.Phase = v1.PodRunning
|
||||
}
|
||||
|
||||
// Create one pod per node
|
||||
p1 := test.BuildTestPod("p1", 200, 0, node1.Name, updatePod)
|
||||
p2 := test.BuildTestPod("p2", 200, 0, node2.Name, updatePod)
|
||||
p3 := test.BuildTestPod("p3", 200, 0, node3.Name, updatePod)
|
||||
p4 := test.BuildTestPod("p4", 200, 0, node4.Name, updatePod)
|
||||
|
||||
objects := []runtime.Object{node1, node2, node3, node4, p1, p2, p3, p4}
|
||||
|
||||
// Map pod names to their node names for validation
|
||||
podToNode := map[string]string{
|
||||
"p1": "n1",
|
||||
"p2": "n2",
|
||||
"p3": "n3",
|
||||
"p4": "n4",
|
||||
}
|
||||
|
||||
policy := removePodsViolatingNodeTaintsPolicy()
|
||||
if tc.nodeSelector != "" {
|
||||
policy.NodeSelector = &tc.nodeSelector
|
||||
}
|
||||
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
_, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, objects...)
|
||||
defer cancel()
|
||||
|
||||
// Verify all pods are created initially
|
||||
pods, err := client.CoreV1().Pods(p1.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to list pods: %v", err)
|
||||
}
|
||||
if len(pods.Items) != 4 {
|
||||
t.Errorf("Expected 4 pods initially, got %d", len(pods.Items))
|
||||
}
|
||||
|
||||
var evictedPods []string
|
||||
if !tc.dryRun {
|
||||
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil))
|
||||
}
|
||||
|
||||
evictedPodNames := runDeschedulerLoopAndGetEvictedPods(ctx, t, deschedulerInstance, tc.dryRun)
|
||||
if tc.dryRun {
|
||||
evictedPods = evictedPodNames
|
||||
}
|
||||
|
||||
// Collect which nodes had pods evicted from them
|
||||
nodesWithEvictedPods := make(map[string]bool)
|
||||
for _, podName := range evictedPods {
|
||||
if nodeName, ok := podToNode[podName]; ok {
|
||||
nodesWithEvictedPods[nodeName] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the correct number of nodes had pods evicted
|
||||
if len(nodesWithEvictedPods) != len(tc.expectedEvictedFromNodes) {
|
||||
t.Fatalf("Expected pods to be evicted from %d nodes, got %d nodes: %v", len(tc.expectedEvictedFromNodes), len(nodesWithEvictedPods), nodesWithEvictedPods)
|
||||
}
|
||||
|
||||
// Verify pods were evicted from the correct nodes
|
||||
for _, nodeName := range tc.expectedEvictedFromNodes {
|
||||
if !nodesWithEvictedPods[nodeName] {
|
||||
t.Fatalf("Expected pod to be evicted from node %s, but it was not", nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify no unexpected nodes had pods evicted
|
||||
for nodeName := range nodesWithEvictedPods {
|
||||
found := false
|
||||
for _, expectedNode := range tc.expectedEvictedFromNodes {
|
||||
if nodeName == expectedNode {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("Unexpected eviction from node %s", nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Successfully evicted pods from nodes: %v", tc.expectedEvictedFromNodes)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAwareDescheduling(t *testing.T) {
|
||||
initPluginRegistry()
|
||||
|
||||
@@ -801,7 +1099,6 @@ func TestLoadAwareDescheduling(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
nodes := []*v1.Node{node1, node2}
|
||||
|
||||
p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod)
|
||||
p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod)
|
||||
@@ -850,6 +1147,7 @@ func TestLoadAwareDescheduling(t *testing.T) {
|
||||
initFeatureGates(),
|
||||
policy,
|
||||
metricsClientset,
|
||||
false,
|
||||
node1, node2, p1, p2, p3, p4, p5)
|
||||
defer cancel()
|
||||
|
||||
@@ -857,7 +1155,7 @@ func TestLoadAwareDescheduling(t *testing.T) {
|
||||
// after newDescheduler in RunDeschedulerStrategies.
|
||||
descheduler.metricsCollector.Collect(ctx)
|
||||
|
||||
err := descheduler.runDeschedulerLoop(ctx, nodes)
|
||||
err := descheduler.runDeschedulerLoop(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
@@ -867,3 +1165,407 @@ func TestLoadAwareDescheduling(t *testing.T) {
|
||||
}
|
||||
t.Logf("Total evictions: %v", totalEs)
|
||||
}
|
||||
|
||||
func TestPodEvictionReactionFncErrorHandling(t *testing.T) {
|
||||
podsGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
setupFnc func(*fakeclientset.Clientset) (name, namespace string)
|
||||
expectHandled bool
|
||||
expectError bool
|
||||
errorContains string
|
||||
expectedCacheLen int
|
||||
}{
|
||||
{
|
||||
name: "handles pod eviction successfully and adds to cache",
|
||||
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
|
||||
pod := test.BuildTestPod("pod1", 100, 0, "node1", test.SetRSOwnerRef)
|
||||
err := fakeClient.Tracker().Add(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add pod: %v", err)
|
||||
}
|
||||
return pod.Name, pod.Namespace
|
||||
},
|
||||
expectHandled: true,
|
||||
expectError: false,
|
||||
expectedCacheLen: 1,
|
||||
},
|
||||
{
|
||||
name: "returns false and error when delete fails allowing other reactors to handle",
|
||||
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
|
||||
pod := test.BuildTestPod("pod1", 100, 0, "node1", test.SetRSOwnerRef)
|
||||
if err := fakeClient.Tracker().Add(pod); err != nil {
|
||||
t.Fatalf("Failed to add pod: %v", err)
|
||||
}
|
||||
if err := fakeClient.Tracker().Delete(podsGVR, pod.Namespace, pod.Name); err != nil {
|
||||
t.Fatalf("Failed to pre-delete pod: %v", err)
|
||||
}
|
||||
return pod.Name, pod.Namespace
|
||||
},
|
||||
expectHandled: false,
|
||||
expectError: true,
|
||||
errorContains: "unable to delete pod",
|
||||
expectedCacheLen: 0,
|
||||
},
|
||||
{
|
||||
name: "returns error when pod doesn't exist in tracker from the start",
|
||||
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
|
||||
// Don't add the pod to the tracker at all
|
||||
return "nonexistent-pod", "default"
|
||||
},
|
||||
expectHandled: false,
|
||||
expectError: true,
|
||||
errorContains: "unable to delete pod",
|
||||
expectedCacheLen: 0,
|
||||
},
|
||||
{
|
||||
name: "returns error when object is not a pod",
|
||||
setupFnc: func(fakeClient *fakeclientset.Clientset) (string, string) {
|
||||
configMap := &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-config",
|
||||
Namespace: "default",
|
||||
},
|
||||
}
|
||||
if err := fakeClient.Tracker().Create(podsGVR, configMap, "default"); err != nil {
|
||||
t.Fatalf("Failed to add ConfigMap to pods resource: %v", err)
|
||||
}
|
||||
return configMap.Name, configMap.Namespace
|
||||
},
|
||||
expectHandled: false,
|
||||
expectError: true,
|
||||
errorContains: "unable to convert object to *v1.Pod",
|
||||
expectedCacheLen: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fakeClient := fakeclientset.NewSimpleClientset()
|
||||
cache := newEvictedPodsCache()
|
||||
|
||||
name, namespace := tc.setupFnc(fakeClient)
|
||||
|
||||
reactionFnc := podEvictionReactionFnc(fakeClient, cache)
|
||||
|
||||
handled, _, err := reactionFnc(core.NewCreateSubresourceAction(
|
||||
podsGVR,
|
||||
name,
|
||||
"eviction",
|
||||
namespace,
|
||||
&policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
},
|
||||
},
|
||||
))
|
||||
|
||||
if handled != tc.expectHandled {
|
||||
t.Errorf("Expected handled=%v, got %v", tc.expectHandled, handled)
|
||||
}
|
||||
|
||||
if tc.expectError {
|
||||
if err == nil {
|
||||
t.Fatal("Expected error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), tc.errorContains) {
|
||||
t.Errorf("Expected error message to contain '%s', got: %v", tc.errorContains, err)
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(cache.list()) != tc.expectedCacheLen {
|
||||
t.Errorf("Expected %d pods in cache, got %d", tc.expectedCacheLen, len(cache.list()))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// verifyPodIdentityFields checks if name, namespace, and UID match expected values
|
||||
func verifyPodIdentityFields(t *testing.T, name, namespace, uid, expectedName, expectedNamespace, expectedUID, context string) {
|
||||
t.Helper()
|
||||
if name != expectedName {
|
||||
t.Fatalf("Expected pod name %s%s, got %s", expectedName, context, name)
|
||||
}
|
||||
if namespace != expectedNamespace {
|
||||
t.Fatalf("Expected pod namespace %s%s, got %s", expectedNamespace, context, namespace)
|
||||
}
|
||||
if uid != expectedUID {
|
||||
t.Fatalf("Expected pod UID %s%s, got %s", expectedUID, context, uid)
|
||||
}
|
||||
}
|
||||
|
||||
// verifyPodIdentity checks if a pod has the expected name, namespace, and UID
|
||||
func verifyPodIdentity(t *testing.T, pod *v1.Pod, expectedName, expectedNamespace string, expectedUID types.UID) {
|
||||
t.Helper()
|
||||
verifyPodIdentityFields(t, pod.Name, pod.Namespace, string(pod.UID), expectedName, expectedNamespace, string(expectedUID), "")
|
||||
}
|
||||
|
||||
func TestEvictedPodRestorationInDryRun(t *testing.T) {
|
||||
// Initialize klog flags
|
||||
// klog.InitFlags(nil)
|
||||
|
||||
// Set verbosity level (higher number = more verbose)
|
||||
// 0 = errors only, 1-4 = info, 5-9 = debug, 10+ = trace
|
||||
// flag.Set("v", "4")
|
||||
|
||||
initPluginRegistry()
|
||||
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
|
||||
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
|
||||
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Create descheduler with DryRun mode
|
||||
client := fakeclientset.NewSimpleClientset(node1, node2, p1)
|
||||
eventClient := fakeclientset.NewSimpleClientset(node1, node2, p1)
|
||||
|
||||
rs, err := options.NewDeschedulerServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize server: %v", err)
|
||||
}
|
||||
rs.Client = client
|
||||
rs.EventClient = eventClient
|
||||
rs.DefaultFeatureGates = initFeatureGates()
|
||||
rs.DryRun = true // Set DryRun before creating descheduler
|
||||
|
||||
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
|
||||
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctxCancel, client)
|
||||
defer eventBroadcaster.Shutdown()
|
||||
|
||||
// Always create descheduler with real client/factory first to register all informers
|
||||
descheduler, err := newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to create descheduler instance: %v", err)
|
||||
}
|
||||
|
||||
sharedInformerFactory.Start(ctxCancel.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctxCancel.Done())
|
||||
|
||||
// Create sandbox with resources to mirror from real client
|
||||
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create kube client sandbox: %v", err)
|
||||
}
|
||||
|
||||
// Replace descheduler with one using fake client/factory
|
||||
descheduler, err = newDescheduler(ctxCancel, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to create dry run descheduler instance: %v", err)
|
||||
}
|
||||
|
||||
// Start and sync the fake factory after creating the descheduler
|
||||
kubeClientSandbox.fakeSharedInformerFactory().Start(ctxCancel.Done())
|
||||
kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctxCancel.Done())
|
||||
|
||||
// Verify the pod exists in the fake client after initialization
|
||||
pod, err := kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected pod %s to exist in fake client after initialization, but got error: %v", p1.Name, err)
|
||||
}
|
||||
verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID)
|
||||
klog.Infof("Pod %s exists in fake client after initialization", p1.Name)
|
||||
|
||||
// Run two descheduling cycles to verify pod eviction and restoration works repeatedly
|
||||
for i := 1; i <= 2; i++ {
|
||||
// Run descheduling cycle
|
||||
klog.Infof("Running descheduling cycle %d", i)
|
||||
descheduler.podEvictor.ResetCounters()
|
||||
descheduler.runProfiles(ctx)
|
||||
|
||||
// Verify the pod was evicted (should not exist in fake client anymore)
|
||||
_, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Fatalf("Expected pod %s to be evicted from fake client in cycle %d, but it still exists", p1.Name, i)
|
||||
}
|
||||
if !apierrors.IsNotFound(err) {
|
||||
t.Fatalf("Expected NotFound error for pod %s in cycle %d, got: %v", p1.Name, i, err)
|
||||
}
|
||||
klog.Infof("Pod %s was successfully evicted from fake client in cycle %d", p1.Name, i)
|
||||
|
||||
// Verify the pod was added to the evicted pods cache
|
||||
evictedPods := descheduler.kubeClientSandbox.evictedPodsCache.list()
|
||||
if len(evictedPods) != 1 {
|
||||
t.Fatalf("Expected 1 pod in evicted cache in cycle %d, got %d", i, len(evictedPods))
|
||||
}
|
||||
verifyPodIdentityFields(t, evictedPods[0].Name, evictedPods[0].Namespace, evictedPods[0].UID, p1.Name, p1.Namespace, string(p1.UID), fmt.Sprintf(" in cycle %d", i))
|
||||
klog.Infof("Pod %s was successfully added to evicted pods cache in cycle %d (UID: %s)", p1.Name, i, p1.UID)
|
||||
|
||||
// Restore evicted pods
|
||||
klog.Infof("Restoring evicted pods from cache in cycle %d", i)
|
||||
if err := descheduler.kubeClientSandbox.restoreEvictedPods(ctx); err != nil {
|
||||
t.Fatalf("Failed to restore evicted pods in cycle %d: %v", i, err)
|
||||
}
|
||||
descheduler.kubeClientSandbox.evictedPodsCache.clear()
|
||||
|
||||
// Verify the pod was restored back to the fake client
|
||||
pod, err = kubeClientSandbox.fakeClient().CoreV1().Pods(p1.Namespace).Get(ctx, p1.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected pod %s to be restored to fake client in cycle %d, but got error: %v", p1.Name, i, err)
|
||||
}
|
||||
verifyPodIdentity(t, pod, p1.Name, p1.Namespace, p1.UID)
|
||||
klog.Infof("Pod %s was successfully restored to fake client in cycle %d (UID: %s)", p1.Name, i, pod.UID)
|
||||
|
||||
// Verify cache was cleared after restoration
|
||||
evictedPods = descheduler.kubeClientSandbox.evictedPodsCache.list()
|
||||
if len(evictedPods) != 0 {
|
||||
t.Fatalf("Expected evicted cache to be empty after restoration in cycle %d, got %d pods", i, len(evictedPods))
|
||||
}
|
||||
klog.Infof("Evicted pods cache was cleared after restoration in cycle %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
// verifyAllPrometheusClientsEqual checks that all Prometheus client variables are equal to the expected value
|
||||
func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPluginHandle, fromDescheduler promapi.Client) {
|
||||
t.Helper()
|
||||
if fromReactor != expected {
|
||||
t.Fatalf("Prometheus client from reactor: expected %v, got %v", expected, fromReactor)
|
||||
}
|
||||
if fromPluginHandle != expected {
|
||||
t.Fatalf("Prometheus client from plugin handle: expected %v, got %v", expected, fromPluginHandle)
|
||||
}
|
||||
if fromDescheduler != expected {
|
||||
t.Fatalf("Prometheus client from descheduler: expected %v, got %v", expected, fromDescheduler)
|
||||
}
|
||||
t.Logf("All Prometheus clients variables correctly set to: %v", expected)
|
||||
}
|
||||
|
||||
// TestPluginPrometheusClientAccess tests that the Prometheus client is accessible through the plugin handle
|
||||
func TestPluginPrometheusClientAccess(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
dryRun bool
|
||||
}{
|
||||
{
|
||||
name: "dry run disabled",
|
||||
dryRun: false,
|
||||
},
|
||||
{
|
||||
name: "dry run enabled",
|
||||
dryRun: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
initPluginRegistry()
|
||||
|
||||
newInvoked := false
|
||||
reactorInvoked := false
|
||||
var prometheusClientFromPluginNewHandle promapi.Client
|
||||
var prometheusClientFromReactor promapi.Client
|
||||
|
||||
fakePlugin := &fakeplugin.FakePlugin{
|
||||
PluginName: "TestPluginWithPrometheusClient",
|
||||
}
|
||||
|
||||
fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) {
|
||||
if dAction, ok := action.(fakeplugin.DescheduleAction); ok {
|
||||
reactorInvoked = true
|
||||
prometheusClientFromReactor = dAction.Handle().PrometheusClient()
|
||||
return true, false, nil
|
||||
}
|
||||
return false, false, nil
|
||||
})
|
||||
|
||||
pluginregistry.Register(
|
||||
fakePlugin.PluginName,
|
||||
fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) {
|
||||
newInvoked = true
|
||||
prometheusClientFromPluginNewHandle = action.Handle().PrometheusClient()
|
||||
}),
|
||||
&fakeplugin.FakePlugin{},
|
||||
&fakeplugin.FakePluginArgs{},
|
||||
fakeplugin.ValidateFakePluginArgs,
|
||||
fakeplugin.SetDefaults_FakePluginArgs,
|
||||
pluginregistry.PluginRegistry,
|
||||
)
|
||||
|
||||
deschedulerPolicy := &api.DeschedulerPolicy{
|
||||
Profiles: []api.DeschedulerProfile{
|
||||
{
|
||||
Name: "test-profile",
|
||||
PluginConfigs: []api.PluginConfig{
|
||||
{
|
||||
Name: fakePlugin.PluginName,
|
||||
Args: &fakeplugin.FakePluginArgs{},
|
||||
},
|
||||
},
|
||||
Plugins: api.Plugins{
|
||||
Deschedule: api.PluginSet{
|
||||
Enabled: []string{fakePlugin.PluginName},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
|
||||
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
|
||||
|
||||
_, descheduler, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
|
||||
|
||||
// Test cycles with different Prometheus client values
|
||||
cycles := []struct {
|
||||
name string
|
||||
client promapi.Client
|
||||
}{
|
||||
{
|
||||
name: "initial client",
|
||||
client: &mockPrometheusClient{name: "new-init-client"},
|
||||
},
|
||||
{
|
||||
name: "nil client",
|
||||
client: nil,
|
||||
},
|
||||
{
|
||||
name: "new client",
|
||||
client: &mockPrometheusClient{name: "new-client"},
|
||||
},
|
||||
{
|
||||
name: "another client",
|
||||
client: &mockPrometheusClient{name: "another-client"},
|
||||
},
|
||||
}
|
||||
|
||||
for i, cycle := range cycles {
|
||||
t.Logf("Cycle %d: %s", i+1, cycle.name)
|
||||
|
||||
// Set the descheduler's Prometheus client
|
||||
t.Logf("Setting descheduler.prometheusClient from %v to %v", descheduler.prometheusClient, cycle.client)
|
||||
descheduler.prometheusClient = cycle.client
|
||||
|
||||
newInvoked = false
|
||||
reactorInvoked = false
|
||||
prometheusClientFromPluginNewHandle = nil
|
||||
prometheusClientFromReactor = nil
|
||||
|
||||
descheduler.runProfiles(ctx)
|
||||
|
||||
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.prometheusClient=%v", i+1, prometheusClientFromReactor, descheduler.prometheusClient)
|
||||
|
||||
if !newInvoked {
|
||||
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
|
||||
}
|
||||
|
||||
if !reactorInvoked {
|
||||
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
|
||||
}
|
||||
|
||||
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.prometheusClient)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -403,12 +403,6 @@ func (pe *PodEvictor) ResetCounters() {
|
||||
pe.totalPodCount = 0
|
||||
}
|
||||
|
||||
func (pe *PodEvictor) SetClient(client clientset.Interface) {
|
||||
pe.mu.Lock()
|
||||
defer pe.mu.Unlock()
|
||||
pe.client = client
|
||||
}
|
||||
|
||||
func (pe *PodEvictor) evictionRequestsTotal() uint {
|
||||
if pe.featureGates.Enabled(features.EvictionsInBackground) {
|
||||
return pe.erCache.evictionRequestsTotal()
|
||||
|
||||
446
pkg/descheduler/kubeclientsandbox.go
Normal file
446
pkg/descheduler/kubeclientsandbox.go
Normal file
@@ -0,0 +1,446 @@
|
||||
/*
|
||||
Copyright 2026 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package descheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
policyv1 "k8s.io/api/policy/v1"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// evictedPodInfo stores identifying information about a pod that was evicted during dry-run mode
|
||||
type evictedPodInfo struct {
|
||||
Namespace string
|
||||
Name string
|
||||
UID string
|
||||
}
|
||||
|
||||
// evictedPodsCache is a thread-safe cache for tracking pods evicted during dry-run mode
|
||||
type evictedPodsCache struct {
|
||||
sync.RWMutex
|
||||
pods map[string]*evictedPodInfo
|
||||
}
|
||||
|
||||
func newEvictedPodsCache() *evictedPodsCache {
|
||||
return &evictedPodsCache{
|
||||
pods: make(map[string]*evictedPodInfo),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) add(pod *v1.Pod) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.pods[string(pod.UID)] = &evictedPodInfo{
|
||||
Namespace: pod.Namespace,
|
||||
Name: pod.Name,
|
||||
UID: string(pod.UID),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) list() []*evictedPodInfo {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
pods := make([]*evictedPodInfo, 0, len(c.pods))
|
||||
for _, pod := range c.pods {
|
||||
podCopy := *pod
|
||||
pods = append(pods, &podCopy)
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
func (c *evictedPodsCache) clear() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.pods = make(map[string]*evictedPodInfo)
|
||||
}
|
||||
|
||||
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory
|
||||
// that mirrors resources from a real client, useful for dry-run testing scenarios
|
||||
type kubeClientSandbox struct {
|
||||
fakeKubeClient *fakeclientset.Clientset
|
||||
fakeFactory informers.SharedInformerFactory
|
||||
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
|
||||
evictedPodsCache *evictedPodsCache
|
||||
podEvictionReactionFnc func(*fakeclientset.Clientset, *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error)
|
||||
}
|
||||
|
||||
func newDefaultKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory) (*kubeClientSandbox, error) {
|
||||
return newKubeClientSandbox(client, sharedInformerFactory,
|
||||
v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
v1.SchemeGroupVersion.WithResource("namespaces"),
|
||||
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"),
|
||||
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
|
||||
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"),
|
||||
)
|
||||
}
|
||||
|
||||
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
|
||||
sandbox := &kubeClientSandbox{
|
||||
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
|
||||
evictedPodsCache: newEvictedPodsCache(),
|
||||
podEvictionReactionFnc: podEvictionReactionFnc,
|
||||
}
|
||||
|
||||
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
|
||||
// simulate a pod eviction by deleting a pod
|
||||
sandbox.fakeKubeClient.PrependReactor("create", "pods", sandbox.podEvictionReactionFnc(sandbox.fakeKubeClient, sandbox.evictedPodsCache))
|
||||
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
|
||||
|
||||
for _, resource := range resources {
|
||||
informer, err := sharedInformerFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sandbox.resourceToInformer[resource] = informer
|
||||
}
|
||||
|
||||
// Register event handlers to sync changes from real client to fake client.
|
||||
// These handlers will keep the fake client in sync with ongoing changes.
|
||||
if err := sandbox.registerEventHandlers(); err != nil {
|
||||
return nil, fmt.Errorf("error registering event handlers: %w", err)
|
||||
}
|
||||
|
||||
return sandbox, nil
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) registerEventHandlers() error {
|
||||
for resource, informer := range sandbox.resourceToInformer {
|
||||
// Create a local copy to avoid closure capture issue
|
||||
resource := resource
|
||||
|
||||
_, err := sandbox.fakeFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting resource %s for fake factory: %w", resource, err)
|
||||
}
|
||||
|
||||
_, err = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Add(runtimeObj); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
klog.ErrorS(err, "failed to add object to fake client", "resource", resource)
|
||||
}
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
runtimeObj, ok := newObj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
metaObj, err := meta.Accessor(runtimeObj)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Update(resource, runtimeObj, metaObj.GetNamespace()); err != nil {
|
||||
klog.ErrorS(err, "failed to update object in fake client", "resource", resource)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// Handle tombstone case where the object might be wrapped
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = tombstone.Obj
|
||||
}
|
||||
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "object is not a runtime.Object", "resource", resource)
|
||||
return
|
||||
}
|
||||
metaObj, err := meta.Accessor(runtimeObj)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get object metadata", "resource", resource)
|
||||
return
|
||||
}
|
||||
if err := sandbox.fakeKubeClient.Tracker().Delete(resource, metaObj.GetNamespace(), metaObj.GetName()); err != nil {
|
||||
klog.ErrorS(err, "failed to delete object from fake client", "resource", resource)
|
||||
}
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error adding event handler for resource %s: %w", resource, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset {
|
||||
return sandbox.fakeKubeClient
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory {
|
||||
return sandbox.fakeFactory
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) reset() {
|
||||
sandbox.evictedPodsCache.clear()
|
||||
}
|
||||
|
||||
// hasObjectInIndexer checks if an object exists in the fake indexer for the specified resource
|
||||
func (sandbox *kubeClientSandbox) hasObjectInIndexer(resource schema.GroupVersionResource, namespace, name, uid string) (bool, error) {
|
||||
informer, err := sandbox.fakeFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting informer for resource %s: %w", resource, err)
|
||||
}
|
||||
|
||||
key := cache.MetaObjectToName(&metav1.ObjectMeta{Namespace: namespace, Name: name}).String()
|
||||
obj, exists, err := informer.Informer().GetIndexer().GetByKey(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
metaObj, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get object metadata: %w", err)
|
||||
}
|
||||
return string(metaObj.GetUID()) == uid, nil
|
||||
}
|
||||
|
||||
// hasRuntimeObjectInIndexer checks if a runtime.Object exists in the fake indexer by detecting its resource type
|
||||
func (sandbox *kubeClientSandbox) hasRuntimeObjectInIndexer(obj runtime.Object) (bool, error) {
|
||||
// Get metadata accessor to extract namespace and name
|
||||
metaObj, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get object metadata: %w", err)
|
||||
}
|
||||
|
||||
// Get the GVK from the object using TypeMeta
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
if gvk.Empty() {
|
||||
return false, fmt.Errorf("no GroupVersionKind found for object")
|
||||
}
|
||||
|
||||
// Use the GVK to construct the GVR by pluralizing the kind
|
||||
plural, _ := meta.UnsafeGuessKindToResource(gvk)
|
||||
gvr := schema.GroupVersionResource{
|
||||
Group: gvk.Group,
|
||||
Version: gvk.Version,
|
||||
Resource: plural.Resource,
|
||||
}
|
||||
|
||||
return sandbox.hasObjectInIndexer(gvr, metaObj.GetNamespace(), metaObj.GetName(), string(metaObj.GetUID()))
|
||||
}
|
||||
|
||||
// shouldSkipPodWait checks if a pod still exists in the fake client with the expected UID.
|
||||
// Returns (shouldSkip, error). shouldSkip is true if we should skip waiting for this pod.
|
||||
// Returns an error for unexpected failures (non-NotFound errors).
|
||||
func (sandbox *kubeClientSandbox) shouldSkipPodWait(ctx context.Context, pod *evictedPodInfo) (bool, error) {
|
||||
// Check if the pod still exists in the fake client (it could have been deleted from the real client
|
||||
// and the deletion propagated via event handlers in the meantime)
|
||||
fakePod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(3).InfoS("Pod no longer exists in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return true, nil
|
||||
}
|
||||
return false, fmt.Errorf("error getting pod %s/%s from fake client: %w", pod.Namespace, pod.Name, err)
|
||||
}
|
||||
if string(fakePod.UID) != pod.UID {
|
||||
klog.V(3).InfoS("Pod UID mismatch in fake client, skipping wait", "namespace", pod.Namespace, "name", pod.Name, "expectedUID", pod.UID, "actualUID", string(fakePod.UID))
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func waitForPodsCondition(ctx context.Context, pods []*evictedPodInfo, checkFn func(*evictedPodInfo) (bool, error), successMsg string) error {
|
||||
if len(pods) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
for _, pod := range pods {
|
||||
satisfied, err := checkFn(pod)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !satisfied {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(4).InfoS(successMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreEvictedPods restores pods from the evicted pods cache back to the fake client
|
||||
func (sandbox *kubeClientSandbox) restoreEvictedPods(ctx context.Context) error {
|
||||
podInformer, ok := sandbox.resourceToInformer[v1.SchemeGroupVersion.WithResource("pods")]
|
||||
if !ok {
|
||||
return fmt.Errorf("pod informer not found in resourceToInformer")
|
||||
}
|
||||
|
||||
evictedPods := sandbox.evictedPodsCache.list()
|
||||
|
||||
// First wait loop: Check that all evicted pods are cleared from the indexers.
|
||||
// This ensures the eviction has fully propagated through the fake informer's indexer.
|
||||
// We check both existence and UID match to handle cases where a pod was deleted and
|
||||
// recreated with the same name but different UID.
|
||||
if err := waitForPodsCondition(ctx, evictedPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Error checking indexer for pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
|
||||
return false, nil
|
||||
}
|
||||
if exists {
|
||||
klog.V(4).InfoS("Pod with matching UID still exists in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Pod with matching UID no longer in fake indexer", "namespace", pod.Namespace, "name", pod.Name, "uid", pod.UID)
|
||||
return true, nil
|
||||
}, "All evicted pods removed from fake indexer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for evicted pods to be removed from fake indexer: %w", err)
|
||||
}
|
||||
|
||||
var restoredPods []*evictedPodInfo
|
||||
for _, podInfo := range sandbox.evictedPodsCache.list() {
|
||||
obj, err := podInformer.Lister().ByNamespace(podInfo.Namespace).Get(podInfo.Name)
|
||||
if err != nil {
|
||||
klog.V(3).InfoS("Pod not found in real client, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Object is not a pod", "namespace", podInfo.Namespace, "name", podInfo.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
if string(pod.UID) != podInfo.UID {
|
||||
klog.V(3).InfoS("Pod UID mismatch, skipping restoration", "namespace", podInfo.Namespace, "name", podInfo.Name, "expectedUID", podInfo.UID, "actualUID", string(pod.UID))
|
||||
continue
|
||||
}
|
||||
|
||||
if err = sandbox.fakeKubeClient.Tracker().Add(pod); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
return fmt.Errorf("failed to restore pod %s/%s to fake client: %w", podInfo.Namespace, podInfo.Name, err)
|
||||
}
|
||||
klog.V(4).InfoS("Successfully restored pod to fake client", "namespace", podInfo.Namespace, "name", podInfo.Name, "uid", podInfo.UID)
|
||||
restoredPods = append(restoredPods, podInfo)
|
||||
}
|
||||
|
||||
// Second wait loop: Make sure the evicted pods are added back to the fake client.
|
||||
// This ensures the restored pods are accessible through the fake informer's lister.
|
||||
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
podObj, err := sandbox.fakeFactory.Core().V1().Pods().Lister().Pods(pod.Namespace).Get(pod.Name)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Pod not yet accessible in fake informer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
|
||||
if checkErr != nil {
|
||||
return false, checkErr
|
||||
}
|
||||
if shouldSkip {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Pod accessible in fake informer", "namespace", pod.Namespace, "name", pod.Name, "node", podObj.Spec.NodeName)
|
||||
return true, nil
|
||||
}, "All restored pods are accessible in fake informer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for pods to be accessible in fake informer: %w", err)
|
||||
}
|
||||
|
||||
// Third wait loop: Make sure the indexers see the added pods.
|
||||
// This is important to ensure each descheduling cycle can see all the restored pods.
|
||||
// Without this wait, the next cycle might not see the restored pods in the indexer yet.
|
||||
if err := waitForPodsCondition(ctx, restoredPods, func(pod *evictedPodInfo) (bool, error) {
|
||||
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, pod.UID)
|
||||
if err != nil {
|
||||
klog.V(4).InfoS("Error checking indexer for restored pod", "namespace", pod.Namespace, "name", pod.Name, "error", err)
|
||||
return false, nil
|
||||
}
|
||||
if !exists {
|
||||
klog.V(4).InfoS("Restored pod not yet in fake indexer, waiting", "namespace", pod.Namespace, "name", pod.Name)
|
||||
shouldSkip, checkErr := sandbox.shouldSkipPodWait(ctx, pod)
|
||||
if checkErr != nil {
|
||||
return false, checkErr
|
||||
}
|
||||
if shouldSkip {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).InfoS("Restored pod now in fake indexer", "namespace", pod.Namespace, "name", pod.Name)
|
||||
return true, nil
|
||||
}, "All restored pods are now in fake indexer"); err != nil {
|
||||
return fmt.Errorf("timeout waiting for restored pods to appear in fake indexer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset, evictedCache *evictedPodsCache) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return func(action core.Action) (bool, runtime.Object, error) {
|
||||
if action.GetSubresource() == "eviction" {
|
||||
createAct, matched := action.(core.CreateActionImpl)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
|
||||
}
|
||||
eviction, matched := createAct.Object.(*policy.Eviction)
|
||||
if !matched {
|
||||
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
|
||||
}
|
||||
podObj, err := fakeClient.Tracker().Get(action.GetResource(), eviction.GetNamespace(), eviction.GetName())
|
||||
if err == nil {
|
||||
if pod, ok := podObj.(*v1.Pod); ok {
|
||||
evictedCache.add(pod)
|
||||
} else {
|
||||
return false, nil, fmt.Errorf("unable to convert object to *v1.Pod for %v/%v", eviction.GetNamespace(), eviction.GetName())
|
||||
}
|
||||
} else if !apierrors.IsNotFound(err) {
|
||||
return false, nil, fmt.Errorf("unable to get pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
||||
}
|
||||
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
|
||||
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
||||
}
|
||||
return true, nil, nil
|
||||
}
|
||||
// fallback to the default reactor
|
||||
return false, nil, nil
|
||||
}
|
||||
}
|
||||
582
pkg/descheduler/kubeclientsandbox_test.go
Normal file
582
pkg/descheduler/kubeclientsandbox_test.go
Normal file
@@ -0,0 +1,582 @@
|
||||
/*
|
||||
Copyright 2026 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package descheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
"sigs.k8s.io/descheduler/test"
|
||||
)
|
||||
|
||||
// setupTestSandbox creates and initializes a kubeClientSandbox for testing.
|
||||
// It creates a fake client, informer factory, registers pod and node informers,
|
||||
// creates the sandbox, and starts both factories.
|
||||
func setupTestSandbox(ctx context.Context, t *testing.T, initialObjects ...runtime.Object) (*kubeClientSandbox, informers.SharedInformerFactory, clientset.Interface) {
|
||||
// Create a "real" fake client to act as the source of truth
|
||||
realClient := fakeclientset.NewSimpleClientset(initialObjects...)
|
||||
realFactory := informers.NewSharedInformerFactoryWithOptions(realClient, 0)
|
||||
|
||||
// Register pods and nodes informers BEFORE creating the sandbox
|
||||
_ = realFactory.Core().V1().Pods().Informer()
|
||||
_ = realFactory.Core().V1().Nodes().Informer()
|
||||
|
||||
// Create the sandbox with only pods and nodes resources
|
||||
sandbox, err := newKubeClientSandbox(realClient, realFactory,
|
||||
v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create kubeClientSandbox: %v", err)
|
||||
}
|
||||
|
||||
// fake factory created by newKubeClientSandbox needs to be started before
|
||||
// the "real" fake client factory to have all handlers registered
|
||||
// to get complete propagation
|
||||
sandbox.fakeSharedInformerFactory().Start(ctx.Done())
|
||||
sandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
|
||||
|
||||
realFactory.Start(ctx.Done())
|
||||
realFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
return sandbox, realFactory, realClient
|
||||
}
|
||||
|
||||
func TestKubeClientSandboxEventHandlers(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
sandbox, realFactory, realClient := setupTestSandbox(ctx, t)
|
||||
|
||||
// Register a third resource (secrets) in the real factory AFTER sandbox creation
|
||||
// This should NOT be synced to the fake client
|
||||
_ = realFactory.Core().V1().Secrets().Informer()
|
||||
|
||||
// Create test objects
|
||||
testPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-pod",
|
||||
Namespace: "default",
|
||||
UID: "pod-uid-12345",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "test-node",
|
||||
},
|
||||
}
|
||||
|
||||
testNode := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-node",
|
||||
UID: "node-uid-67890",
|
||||
},
|
||||
}
|
||||
|
||||
testSecret := &v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-secret",
|
||||
Namespace: "default",
|
||||
UID: "secret-uid-abcde",
|
||||
},
|
||||
Data: map[string][]byte{
|
||||
"key": []byte("value"),
|
||||
},
|
||||
}
|
||||
|
||||
// Add objects to the real client
|
||||
var err error
|
||||
_, err = realClient.CoreV1().Pods(testPod.Namespace).Create(ctx, testPod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create pod in real client: %v", err)
|
||||
}
|
||||
|
||||
_, err = realClient.CoreV1().Nodes().Create(ctx, testNode, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create node in real client: %v", err)
|
||||
}
|
||||
|
||||
_, err = realClient.CoreV1().Secrets(testSecret.Namespace).Create(ctx, testSecret, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create secret in real client: %v", err)
|
||||
}
|
||||
|
||||
// Helper function to wait for a resource to appear in the sandbox's fake client indexer
|
||||
waitForResourceInIndexer := func(resourceType, namespace, name, uid, description string) error {
|
||||
t.Logf("Waiting for %s to appear in fake client indexer...", description)
|
||||
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
exists, err := sandbox.hasObjectInIndexer(
|
||||
v1.SchemeGroupVersion.WithResource(resourceType),
|
||||
namespace,
|
||||
name,
|
||||
uid,
|
||||
)
|
||||
if err != nil {
|
||||
t.Logf("Error checking %s in indexer: %v", description, err)
|
||||
return false, nil
|
||||
}
|
||||
if exists {
|
||||
t.Logf("%s appeared in fake client indexer", description)
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for the pod to appear in the sandbox's fake client indexer
|
||||
if err := waitForResourceInIndexer("pods", testPod.Namespace, testPod.Name, string(testPod.UID), "pod"); err != nil {
|
||||
t.Fatalf("Pod did not appear in fake client indexer within timeout: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the node to appear in the sandbox's fake client indexer
|
||||
if err := waitForResourceInIndexer("nodes", "", testNode.Name, string(testNode.UID), "node"); err != nil {
|
||||
t.Fatalf("Node did not appear in fake client indexer within timeout: %v", err)
|
||||
}
|
||||
|
||||
// Verify the pod can be retrieved from the fake client
|
||||
retrievedPod, err := sandbox.fakeClient().CoreV1().Pods(testPod.Namespace).Get(ctx, testPod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve pod from fake client: %v", err)
|
||||
}
|
||||
if retrievedPod.Namespace != testPod.Namespace || retrievedPod.Name != testPod.Name || retrievedPod.UID != testPod.UID {
|
||||
t.Errorf("Retrieved pod mismatch: got namespace=%s name=%s uid=%s, want namespace=%s name=%s uid=%s",
|
||||
retrievedPod.Namespace, retrievedPod.Name, retrievedPod.UID, testPod.Namespace, testPod.Name, testPod.UID)
|
||||
}
|
||||
t.Logf("Successfully retrieved pod from fake client: %s/%s", retrievedPod.Namespace, retrievedPod.Name)
|
||||
|
||||
// Verify the node can be retrieved from the fake client
|
||||
retrievedNode, err := sandbox.fakeClient().CoreV1().Nodes().Get(ctx, testNode.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve node from fake client: %v", err)
|
||||
}
|
||||
if retrievedNode.Name != testNode.Name || retrievedNode.UID != testNode.UID {
|
||||
t.Errorf("Retrieved node mismatch: got name=%s uid=%s, want name=%s uid=%s",
|
||||
retrievedNode.Name, retrievedNode.UID, testNode.Name, testNode.UID)
|
||||
}
|
||||
t.Logf("Successfully retrieved node from fake client: %s", retrievedNode.Name)
|
||||
|
||||
// Wait a bit longer and verify the secret does NOT appear in the fake client indexer
|
||||
// because secrets were registered AFTER the sandbox was created
|
||||
t.Log("Verifying secret does NOT appear in fake client indexer...")
|
||||
time.Sleep(500 * time.Millisecond) // Give extra time to ensure it's not just a timing issue
|
||||
|
||||
// First, verify we can get the informer for secrets in the fake factory
|
||||
secretInformer, err := sandbox.fakeSharedInformerFactory().ForResource(v1.SchemeGroupVersion.WithResource("secrets"))
|
||||
if err != nil {
|
||||
t.Logf("Expected: Cannot get secret informer from fake factory: %v", err)
|
||||
} else {
|
||||
// If we can get the informer, check if the secret exists in it
|
||||
key := "default/test-secret"
|
||||
_, exists, err := secretInformer.Informer().GetIndexer().GetByKey(key)
|
||||
if err != nil {
|
||||
t.Logf("Error checking secret in fake indexer: %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Error("Secret should NOT exist in fake client indexer (it was registered after sandbox creation)")
|
||||
} else {
|
||||
t.Log("Correctly verified: Secret does not exist in fake client indexer")
|
||||
}
|
||||
}
|
||||
|
||||
// Also verify that attempting to get the secret directly from fake client should fail
|
||||
_, err = sandbox.fakeClient().CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Error("Secret should NOT be retrievable from fake client (it was not synced)")
|
||||
} else {
|
||||
t.Logf("Correctly verified: Secret not retrievable from fake client: %v", err)
|
||||
}
|
||||
|
||||
// Verify the secret IS in the real client (sanity check)
|
||||
_, err = realClient.CoreV1().Secrets(testSecret.Namespace).Get(ctx, testSecret.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Secret should exist in real client but got error: %v", err)
|
||||
}
|
||||
t.Log("Sanity check passed: Secret exists in real client")
|
||||
}
|
||||
|
||||
func TestKubeClientSandboxReset(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
|
||||
p1 := test.BuildTestPod("p1", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
p2 := test.BuildTestPod("p2", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
|
||||
sandbox, _, _ := setupTestSandbox(ctx, t, node1, p1, p2)
|
||||
|
||||
eviction1 := &policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: p1.Name,
|
||||
Namespace: p1.Namespace,
|
||||
},
|
||||
}
|
||||
eviction2 := &policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: p2.Name,
|
||||
Namespace: p2.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
if err := sandbox.fakeClient().CoreV1().Pods(p1.Namespace).EvictV1(context.TODO(), eviction1); err != nil {
|
||||
t.Fatalf("Error evicting p1: %v", err)
|
||||
}
|
||||
if err := sandbox.fakeClient().CoreV1().Pods(p2.Namespace).EvictV1(context.TODO(), eviction2); err != nil {
|
||||
t.Fatalf("Error evicting p2: %v", err)
|
||||
}
|
||||
|
||||
evictedPods := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPods) != 2 {
|
||||
t.Fatalf("Expected 2 evicted pods in cache, but got %d", len(evictedPods))
|
||||
}
|
||||
t.Logf("Evicted pods in cache before reset: %d", len(evictedPods))
|
||||
|
||||
for _, evictedPod := range evictedPods {
|
||||
if evictedPod.Namespace == "" || evictedPod.Name == "" || evictedPod.UID == "" {
|
||||
t.Errorf("Evicted pod has empty fields: namespace=%s, name=%s, uid=%s", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
|
||||
}
|
||||
t.Logf("Evicted pod: %s/%s (UID: %s)", evictedPod.Namespace, evictedPod.Name, evictedPod.UID)
|
||||
}
|
||||
|
||||
sandbox.reset()
|
||||
|
||||
evictedPodsAfterReset := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPodsAfterReset) != 0 {
|
||||
t.Fatalf("Expected cache to be empty after reset, but found %d pods", len(evictedPodsAfterReset))
|
||||
}
|
||||
t.Logf("Successfully verified cache is empty after reset")
|
||||
}
|
||||
|
||||
func TestKubeClientSandboxRestoreEvictedPods(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
|
||||
|
||||
// Create test pods
|
||||
pod1 := test.BuildTestPod("pod1", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
pod2 := test.BuildTestPod("pod2", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
pod3 := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
pod4 := test.BuildTestPod("pod4", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
|
||||
sandbox, _, realClient := setupTestSandbox(ctx, t, node1, pod1, pod2, pod3, pod4)
|
||||
|
||||
// Evict all pods
|
||||
for _, pod := range []*v1.Pod{pod1, pod2, pod3, pod4} {
|
||||
eviction := &policy.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
}
|
||||
if err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).EvictV1(ctx, eviction); err != nil {
|
||||
t.Fatalf("Error evicting %s: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete pod2 from real client to simulate it being deleted (should skip restoration)
|
||||
if err := realClient.CoreV1().Pods(pod2.Namespace).Delete(ctx, pod2.Name, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatalf("Error deleting pod2 from real client: %v", err)
|
||||
}
|
||||
|
||||
// Delete and recreate pod3 with different UID in real client (should skip restoration)
|
||||
if err := realClient.CoreV1().Pods(pod3.Namespace).Delete(ctx, pod3.Name, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatalf("Error deleting pod3 from real client: %v", err)
|
||||
}
|
||||
pod3New := test.BuildTestPod("pod3", 100, 0, node1.Name, test.SetRSOwnerRef)
|
||||
// Ensure pod3New has a different UID from pod3
|
||||
if pod3New.UID == pod3.UID {
|
||||
pod3New.UID = "new-uid-for-pod3"
|
||||
}
|
||||
if _, err := realClient.CoreV1().Pods(pod3New.Namespace).Create(ctx, pod3New, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Error recreating pod3 in real client: %v", err)
|
||||
}
|
||||
|
||||
// Verify evicted pods are in cache
|
||||
evictedPods := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPods) != 4 {
|
||||
t.Fatalf("Expected 4 evicted pods in cache, got %d", len(evictedPods))
|
||||
}
|
||||
t.Logf("Evicted pods in cache: %d", len(evictedPods))
|
||||
|
||||
// Call restoreEvictedPods
|
||||
t.Log("Calling restoreEvictedPods...")
|
||||
if err := sandbox.restoreEvictedPods(ctx); err != nil {
|
||||
t.Fatalf("restoreEvictedPods failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify pod1 and pod4 were restored (exists in fake client with matching UID and accessible via indexer)
|
||||
for _, pod := range []*v1.Pod{pod1, pod4} {
|
||||
// Check restoration via fake client
|
||||
restoredPod, err := sandbox.fakeClient().CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("%s should have been restored to fake client: %v", pod.Name, err)
|
||||
} else {
|
||||
if restoredPod.UID != pod.UID {
|
||||
t.Errorf("Restored %s UID mismatch: got %s, want %s", pod.Name, restoredPod.UID, pod.UID)
|
||||
}
|
||||
t.Logf("Successfully verified %s restoration: %s/%s (UID: %s)", pod.Name, restoredPod.Namespace, restoredPod.Name, restoredPod.UID)
|
||||
}
|
||||
|
||||
// Check accessibility via indexer
|
||||
exists, err := sandbox.hasObjectInIndexer(v1.SchemeGroupVersion.WithResource("pods"), pod.Namespace, pod.Name, string(pod.UID))
|
||||
if err != nil {
|
||||
t.Errorf("Error checking %s in indexer: %v", pod.Name, err)
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("%s should exist in fake indexer after restoration", pod.Name)
|
||||
} else {
|
||||
t.Logf("Successfully verified %s exists in fake indexer", pod.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify pod2 was NOT restored (deleted from real client)
|
||||
_, err := sandbox.fakeClient().CoreV1().Pods(pod2.Namespace).Get(ctx, pod2.Name, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Error("pod2 should NOT have been restored (was deleted from real client)")
|
||||
} else {
|
||||
t.Logf("Correctly verified pod2 was not restored: %v", err)
|
||||
}
|
||||
|
||||
// Verify pod3 was NOT restored with old UID (UID mismatch case)
|
||||
// Note: pod3 may exist in fake client with NEW UID due to event handler syncing,
|
||||
// but it should NOT have been restored with the OLD UID from evicted cache
|
||||
pod3InFake, err := sandbox.fakeClient().CoreV1().Pods(pod3.Namespace).Get(ctx, pod3.Name, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
// Pod3 exists, but it should have the NEW UID, not the old one
|
||||
if pod3InFake.UID == pod3.UID {
|
||||
t.Error("pod3 should NOT have been restored with old UID (UID mismatch should prevent restoration)")
|
||||
} else {
|
||||
t.Logf("Correctly verified pod3 has new UID (%s), not old UID (%s) - restoration was skipped", pod3InFake.UID, pod3.UID)
|
||||
}
|
||||
} else {
|
||||
// Pod3 doesn't exist - this is also acceptable (event handlers haven't synced it yet)
|
||||
t.Logf("pod3 not found in fake client: %v", err)
|
||||
}
|
||||
|
||||
// Verify evicted pods cache is still intact (restoreEvictedPods doesn't clear it)
|
||||
evictedPodsAfter := sandbox.evictedPodsCache.list()
|
||||
if len(evictedPodsAfter) != 4 {
|
||||
t.Errorf("Expected evicted pods cache to still have 4 entries, got %d", len(evictedPodsAfter))
|
||||
}
|
||||
}
|
||||
|
||||
func TestKubeClientSandboxRestoreEvictedPodsEmptyCache(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
|
||||
sandbox, _, _ := setupTestSandbox(ctx, t, node1)
|
||||
|
||||
// Call restoreEvictedPods with empty cache - should be a no-op
|
||||
t.Log("Calling restoreEvictedPods with empty cache...")
|
||||
if err := sandbox.restoreEvictedPods(ctx); err != nil {
|
||||
t.Fatalf("restoreEvictedPods should succeed with empty cache: %v", err)
|
||||
}
|
||||
t.Log("Successfully verified restoreEvictedPods handles empty cache")
|
||||
}
|
||||
|
||||
func TestEvictedPodsCache(t *testing.T) {
|
||||
t.Run("add single pod", func(t *testing.T) {
|
||||
const (
|
||||
podName = "pod1"
|
||||
podNamespace = "default"
|
||||
podUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
)
|
||||
cache := newEvictedPodsCache()
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
UID: podUID,
|
||||
},
|
||||
}
|
||||
|
||||
cache.add(pod)
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod in cache, got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != podName || pods[0].Namespace != podNamespace || pods[0].UID != podUID {
|
||||
t.Errorf("Pod data mismatch: got name=%s, namespace=%s, uid=%s", pods[0].Name, pods[0].Namespace, pods[0].UID)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add multiple pods", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
pods := []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "11111111-1111-1111-1111-111111111111"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "22222222-2222-2222-2222-222222222222"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default", UID: "33333333-3333-3333-3333-333333333333"}},
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
cache.add(pod)
|
||||
}
|
||||
|
||||
cachedPods := cache.list()
|
||||
if len(cachedPods) != 3 {
|
||||
t.Fatalf("Expected 3 pods in cache, got %d", len(cachedPods))
|
||||
}
|
||||
|
||||
podMap := make(map[string]*evictedPodInfo)
|
||||
for _, cachedPod := range cachedPods {
|
||||
podMap[cachedPod.UID] = cachedPod
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
cached, ok := podMap[string(pod.UID)]
|
||||
if !ok {
|
||||
t.Errorf("Pod with UID %s not found in cache", pod.UID)
|
||||
continue
|
||||
}
|
||||
if cached.Name != pod.Name || cached.Namespace != pod.Namespace {
|
||||
t.Errorf("Pod data mismatch for UID %s: got name=%s, namespace=%s", pod.UID, cached.Name, cached.Namespace)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add duplicate pod updates entry", func(t *testing.T) {
|
||||
const (
|
||||
duplicateUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
updatedPodName = "pod1-new"
|
||||
updatedPodNS = "kube-system"
|
||||
)
|
||||
cache := newEvictedPodsCache()
|
||||
pod1 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
Namespace: "default",
|
||||
UID: duplicateUID,
|
||||
},
|
||||
}
|
||||
pod2 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: updatedPodName,
|
||||
Namespace: updatedPodNS,
|
||||
UID: duplicateUID,
|
||||
},
|
||||
}
|
||||
|
||||
cache.add(pod1)
|
||||
cache.add(pod2)
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod in cache (duplicates should overwrite), got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != updatedPodName || pods[0].Namespace != updatedPodNS {
|
||||
t.Errorf("Expected pod2 data, got name=%s, namespace=%s", pods[0].Name, pods[0].Namespace)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("list returns empty array for empty cache", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
pods := cache.list()
|
||||
if pods == nil {
|
||||
t.Fatal("Expected non-nil slice from list()")
|
||||
}
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty list, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("list returns copies not references", func(t *testing.T) {
|
||||
const originalPodName = "pod1"
|
||||
cache := newEvictedPodsCache()
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: originalPodName,
|
||||
Namespace: "default",
|
||||
UID: "12345678-1234-1234-1234-123456789abc",
|
||||
},
|
||||
}
|
||||
cache.add(pod)
|
||||
|
||||
pods1 := cache.list()
|
||||
pods2 := cache.list()
|
||||
|
||||
if len(pods1) != 1 || len(pods2) != 1 {
|
||||
t.Fatalf("Expected 1 pod in both lists")
|
||||
}
|
||||
|
||||
pods1[0].Name = "modified"
|
||||
|
||||
if pods2[0].Name == "modified" {
|
||||
t.Error("Modifying list result should not affect other list results (should be copies)")
|
||||
}
|
||||
|
||||
pods3 := cache.list()
|
||||
if pods3[0].Name != originalPodName {
|
||||
t.Error("Cache data was modified, list() should return copies")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("clear empties the cache", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "aaaa0000-0000-0000-0000-000000000001"}})
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "bbbb0000-0000-0000-0000-000000000002"}})
|
||||
|
||||
if len(cache.list()) != 2 {
|
||||
t.Fatal("Expected 2 pods before clear")
|
||||
}
|
||||
|
||||
cache.clear()
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty cache after clear, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("clear on empty cache is safe", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.clear()
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 0 {
|
||||
t.Fatalf("Expected empty cache, got %d pods", len(pods))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add after clear works correctly", func(t *testing.T) {
|
||||
cache := newEvictedPodsCache()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default", UID: "00000001-0001-0001-0001-000000000001"}})
|
||||
cache.clear()
|
||||
cache.add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system", UID: "00000002-0002-0002-0002-000000000002"}})
|
||||
|
||||
pods := cache.list()
|
||||
if len(pods) != 1 {
|
||||
t.Fatalf("Expected 1 pod after clear and add, got %d", len(pods))
|
||||
}
|
||||
if pods[0].Name != "pod2" {
|
||||
t.Errorf("Expected pod2, got %s", pods[0].Name)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
listersv1 "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
@@ -78,6 +79,22 @@ func ReadyNodes(ctx context.Context, client clientset.Interface, nodeLister list
|
||||
return readyNodes, nil
|
||||
}
|
||||
|
||||
// ReadyNodesFromInterfaces converts a list of interface{} items to ready nodes.
|
||||
// Each interface{} item is expected to be a *v1.Node. Only ready nodes are returned.
|
||||
func ReadyNodesFromInterfaces(nodeInterfaces []interface{}) ([]*v1.Node, error) {
|
||||
readyNodes := make([]*v1.Node, 0, len(nodeInterfaces))
|
||||
for i, nodeInterface := range nodeInterfaces {
|
||||
node, ok := nodeInterface.(*v1.Node)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("item at index %d is not a *v1.Node", i)
|
||||
}
|
||||
if IsReady(node) {
|
||||
readyNodes = append(readyNodes, node)
|
||||
}
|
||||
}
|
||||
return readyNodes, nil
|
||||
}
|
||||
|
||||
// IsReady checks if the descheduler could run against given node.
|
||||
func IsReady(node *v1.Node) bool {
|
||||
for i := range node.Status.Conditions {
|
||||
@@ -400,3 +417,22 @@ func podMatchesInterPodAntiAffinity(nodeIndexer podutil.GetPodsAssignedToNodeFun
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// BuildGetPodsAssignedToNodeFunc establishes an indexer to map the pods and their assigned nodes.
|
||||
// It returns a function to help us get all the pods that assigned to a node based on the indexer.
|
||||
func AddNodeSelectorIndexer(nodeInformer cache.SharedIndexInformer, indexerName string, nodeSelector labels.Selector) error {
|
||||
return nodeInformer.AddIndexers(cache.Indexers{
|
||||
indexerName: func(obj interface{}) ([]string, error) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
return []string{}, errors.New("unexpected object")
|
||||
}
|
||||
|
||||
if nodeSelector.Matches(labels.Set(node.Labels)) {
|
||||
return []string{indexerName}, nil
|
||||
}
|
||||
|
||||
return []string{}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -19,9 +19,12 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -102,6 +105,183 @@ func TestReadyNodesWithNodeSelector(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadyNodesFromInterfaces(t *testing.T) {
|
||||
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
|
||||
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
|
||||
node2.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}
|
||||
node3 := test.BuildTestNode("node3", 1000, 2000, 9, nil)
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
nodeInterfaces []interface{}
|
||||
expectedCount int
|
||||
expectedNames []string
|
||||
expectError bool
|
||||
errorContains string
|
||||
}{
|
||||
{
|
||||
description: "All nodes are ready",
|
||||
nodeInterfaces: []interface{}{node1, node3},
|
||||
expectedCount: 2,
|
||||
expectedNames: []string{"node1", "node3"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
description: "One node is not ready",
|
||||
nodeInterfaces: []interface{}{node1, node2, node3},
|
||||
expectedCount: 2,
|
||||
expectedNames: []string{"node1", "node3"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
description: "Empty list",
|
||||
nodeInterfaces: []interface{}{},
|
||||
expectedCount: 0,
|
||||
expectedNames: []string{},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
description: "Invalid type in list",
|
||||
nodeInterfaces: []interface{}{node1, "not a node", node3},
|
||||
expectedCount: 0,
|
||||
expectError: true,
|
||||
errorContains: "item at index 1 is not a *v1.Node",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
nodes, err := ReadyNodesFromInterfaces(tc.nodeInterfaces)
|
||||
|
||||
if tc.expectError {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error but got none")
|
||||
} else if tc.errorContains != "" && !strings.Contains(err.Error(), tc.errorContains) {
|
||||
t.Errorf("Expected error to contain '%s', got '%s'", tc.errorContains, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(nodes) != tc.expectedCount {
|
||||
t.Errorf("Expected %d nodes, got %d", tc.expectedCount, len(nodes))
|
||||
}
|
||||
|
||||
for i, expectedName := range tc.expectedNames {
|
||||
if i >= len(nodes) {
|
||||
t.Errorf("Missing node at index %d, expected %s", i, expectedName)
|
||||
continue
|
||||
}
|
||||
if nodes[i].Name != expectedName {
|
||||
t.Errorf("Expected node at index %d to be %s, got %s", i, expectedName, nodes[i].Name)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddNodeSelectorIndexer(t *testing.T) {
|
||||
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
|
||||
node1.Labels = map[string]string{"type": "compute", "zone": "us-east-1"}
|
||||
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
|
||||
node2.Labels = map[string]string{"type": "infra", "zone": "us-west-1"}
|
||||
node3 := test.BuildTestNode("node3", 1000, 2000, 9, nil)
|
||||
node3.Labels = map[string]string{"type": "compute", "zone": "us-west-1"}
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
indexerName string
|
||||
selectorString string
|
||||
expectedMatches []string
|
||||
}{
|
||||
{
|
||||
description: "Index nodes by type=compute",
|
||||
indexerName: "computeNodes",
|
||||
selectorString: "type=compute",
|
||||
expectedMatches: []string{"node1", "node3"},
|
||||
},
|
||||
{
|
||||
description: "Index nodes by type=infra",
|
||||
indexerName: "infraNodes",
|
||||
selectorString: "type=infra",
|
||||
expectedMatches: []string{"node2"},
|
||||
},
|
||||
{
|
||||
description: "Index nodes by zone=us-west-1",
|
||||
indexerName: "westZoneNodes",
|
||||
selectorString: "zone=us-west-1",
|
||||
expectedMatches: []string{"node2", "node3"},
|
||||
},
|
||||
{
|
||||
description: "Index nodes with multiple labels",
|
||||
indexerName: "computeEastNodes",
|
||||
selectorString: "type=compute,zone=us-east-1",
|
||||
expectedMatches: []string{"node1"},
|
||||
},
|
||||
{
|
||||
description: "No matching nodes",
|
||||
indexerName: "noMatchNodes",
|
||||
selectorString: "type=storage",
|
||||
expectedMatches: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
fakeClient := fake.NewSimpleClientset(node1, node2, node3)
|
||||
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
nodeInformer := sharedInformerFactory.Core().V1().Nodes().Informer()
|
||||
|
||||
selector, err := labels.Parse(tc.selectorString)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse selector: %v", err)
|
||||
}
|
||||
|
||||
err = AddNodeSelectorIndexer(nodeInformer, tc.indexerName, selector)
|
||||
if err != nil {
|
||||
t.Fatalf("AddNodeSelectorIndexer failed: %v", err)
|
||||
}
|
||||
|
||||
stopChannel := make(chan struct{})
|
||||
sharedInformerFactory.Start(stopChannel)
|
||||
sharedInformerFactory.WaitForCacheSync(stopChannel)
|
||||
defer close(stopChannel)
|
||||
|
||||
indexer := nodeInformer.GetIndexer()
|
||||
objs, err := indexer.ByIndex(tc.indexerName, tc.indexerName)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to query indexer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Extract node names from the results
|
||||
actualMatches := make([]string, 0, len(objs))
|
||||
for _, obj := range objs {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
t.Errorf("Expected *v1.Node, got %T", obj)
|
||||
continue
|
||||
}
|
||||
actualMatches = append(actualMatches, node.Name)
|
||||
}
|
||||
|
||||
// Sort both slices for consistent comparison
|
||||
sort.Strings(actualMatches)
|
||||
expectedMatches := make([]string, len(tc.expectedMatches))
|
||||
copy(expectedMatches, tc.expectedMatches)
|
||||
sort.Strings(expectedMatches)
|
||||
|
||||
// Compare using cmp.Diff
|
||||
if diff := cmp.Diff(expectedMatches, actualMatches); diff != "" {
|
||||
t.Errorf("Node matches mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsNodeUnschedulable(t *testing.T) {
|
||||
tests := []struct {
|
||||
description string
|
||||
|
||||
@@ -43,6 +43,10 @@ import (
|
||||
// BuildTestPod creates a test pod with given parameters.
|
||||
func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v1.Pod)) *v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: name,
|
||||
@@ -76,6 +80,10 @@ func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v
|
||||
func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget {
|
||||
maxUnavailable := intstr.FromInt32(1)
|
||||
pdb := &policyv1.PodDisruptionBudget{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "policy/v1",
|
||||
Kind: "PodDisruptionBudget",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: name,
|
||||
@@ -94,6 +102,10 @@ func BuildTestPDB(name, appLabel string) *policyv1.PodDisruptionBudget {
|
||||
|
||||
func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim {
|
||||
pvc := &v1.PersistentVolumeClaim{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "v1",
|
||||
Kind: "PersistentVolumeClaim",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: name,
|
||||
@@ -114,6 +126,10 @@ func BuildTestPVC(name, storageClass string) *v1.PersistentVolumeClaim {
|
||||
// BuildPodMetrics creates a test podmetrics with given parameters.
|
||||
func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics {
|
||||
return &v1beta1.PodMetrics{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "metrics.k8s.io/v1beta1",
|
||||
Kind: "PodMetrics",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: "default",
|
||||
@@ -171,6 +187,10 @@ func GetDaemonSetOwnerRefList() []metav1.OwnerReference {
|
||||
// BuildTestNode creates a node with specified capacity.
|
||||
func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) *v1.Node {
|
||||
node := &v1.Node{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "v1",
|
||||
Kind: "Node",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
SelfLink: fmt.Sprintf("/api/v1/nodes/%s", name),
|
||||
@@ -201,6 +221,10 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node))
|
||||
|
||||
func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics {
|
||||
return &v1beta1.NodeMetrics{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "metrics.k8s.io/v1beta1",
|
||||
Kind: "NodeMetrics",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user