mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-25 20:59:28 +01:00
feat: register a node indexer for the global node selector instead of listing nodes with the selector
To avoid iterating through every node every time a list of nodes is requested. This is a prerequisition work for introducing profile level node selectors.
This commit is contained in:
@@ -74,6 +74,7 @@ import (
|
||||
const (
|
||||
prometheusAuthTokenSecretKey = "prometheusAuthToken"
|
||||
workQueueKey = "key"
|
||||
indexerNodeSelectorGlobal = "indexer_node_selector_global"
|
||||
)
|
||||
|
||||
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
|
||||
@@ -206,15 +207,20 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
|
||||
metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders),
|
||||
}
|
||||
|
||||
if rs.MetricsClient != nil {
|
||||
nodeSelector := labels.Everything()
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeSelector = sel
|
||||
nodeSelector := labels.Everything()
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeSelector = sel
|
||||
}
|
||||
|
||||
if err := nodeutil.AddNodeSelectorIndexer(sharedInformerFactory.Core().V1().Nodes().Informer(), indexerNodeSelectorGlobal, nodeSelector); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if rs.MetricsClient != nil {
|
||||
desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
|
||||
}
|
||||
|
||||
@@ -345,7 +351,7 @@ func (d *descheduler) eventHandler() cache.ResourceEventHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
|
||||
func (d *descheduler) runDeschedulerLoop(ctx context.Context) error {
|
||||
var span trace.Span
|
||||
ctx, span = tracing.Tracer().Start(ctx, "runDeschedulerLoop")
|
||||
defer span.End()
|
||||
@@ -354,12 +360,6 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
|
||||
metrics.LoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
|
||||
}(time.Now())
|
||||
|
||||
// if len is still <= 1 error out
|
||||
if len(nodes) <= 1 {
|
||||
klog.InfoS("Skipping descheduling cycle: requires >=2 nodes", "found", len(nodes))
|
||||
return nil // gracefully skip this cycle instead of aborting
|
||||
}
|
||||
|
||||
var client clientset.Interface
|
||||
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
|
||||
// So when evicting pods while running multiple strategies in a row have the cummulative effect
|
||||
@@ -384,6 +384,22 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
|
||||
return fmt.Errorf("build get pods assigned to node function error: %v", err)
|
||||
}
|
||||
|
||||
nodeSelector := labels.Everything()
|
||||
if d.deschedulerPolicy.NodeSelector != nil {
|
||||
sel, err := labels.Parse(*d.deschedulerPolicy.NodeSelector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodeSelector = sel
|
||||
}
|
||||
// TODO(ingvagabund): copy paste all relevant indexers from the real client to the fake one
|
||||
// TODO(ingvagabund): register one indexer per each profile. Respect the precedence of no profile-level node selector is specified.
|
||||
// Also, keep a cache of node label selectors to detect duplicates to avoid creating an extra informer.
|
||||
|
||||
if err := nodeutil.AddNodeSelectorIndexer(fakeSharedInformerFactory.Core().V1().Nodes().Informer(), indexerNodeSelectorGlobal, nodeSelector); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fakeCtx, cncl := context.WithCancel(context.TODO())
|
||||
defer cncl()
|
||||
fakeSharedInformerFactory.Start(fakeCtx.Done())
|
||||
@@ -399,7 +415,7 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
|
||||
d.podEvictor.SetClient(client)
|
||||
d.podEvictor.ResetCounters()
|
||||
|
||||
d.runProfiles(ctx, client, nodes)
|
||||
d.runProfiles(ctx, client)
|
||||
|
||||
klog.V(1).InfoS("Number of evictions/requests", "totalEvicted", d.podEvictor.TotalEvicted(), "evictionRequests", d.podEvictor.TotalEvictionRequests())
|
||||
|
||||
@@ -409,10 +425,31 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
|
||||
// runProfiles runs all the deschedule plugins of all profiles and
|
||||
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
|
||||
// see https://github.com/kubernetes-sigs/descheduler/issues/979
|
||||
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node) {
|
||||
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface) {
|
||||
var span trace.Span
|
||||
ctx, span = tracing.Tracer().Start(ctx, "runProfiles")
|
||||
defer span.End()
|
||||
|
||||
nodesAsInterface, err := d.sharedInformerFactory.Core().V1().Nodes().Informer().GetIndexer().ByIndex(indexerNodeSelectorGlobal, indexerNodeSelectorGlobal)
|
||||
if err != nil {
|
||||
span.AddEvent("Failed to list nodes with global node selector", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
nodes, err := nodeutil.ReadyNodesFromInterfaces(nodesAsInterface)
|
||||
if err != nil {
|
||||
span.AddEvent("Failed to convert node as interfaces into ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// if len is still <= 1 error out
|
||||
if len(nodes) <= 1 {
|
||||
klog.InfoS("Skipping descheduling cycle: requires >=2 nodes", "found", len(nodes))
|
||||
return // gracefully skip this cycle instead of aborting
|
||||
}
|
||||
|
||||
var profileRunners []profileRunner
|
||||
for _, profile := range d.deschedulerPolicy.Profiles {
|
||||
currProfile, err := frameworkprofile.NewProfile(
|
||||
@@ -587,11 +624,6 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
|
||||
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
|
||||
|
||||
var nodeSelector string
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
nodeSelector = *deschedulerPolicy.NodeSelector
|
||||
}
|
||||
|
||||
var eventClient clientset.Interface
|
||||
if rs.DryRun {
|
||||
eventClient = fakeclientset.NewSimpleClientset()
|
||||
@@ -666,14 +698,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
|
||||
defer sSpan.End()
|
||||
|
||||
nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.sharedInformerFactory.Core().V1().Nodes().Lister(), nodeSelector)
|
||||
if err != nil {
|
||||
sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
err = descheduler.runDeschedulerLoop(sCtx, nodes)
|
||||
err = descheduler.runDeschedulerLoop(sCtx)
|
||||
if err != nil {
|
||||
sSpan.AddEvent("Failed to run descheduler loop", trace.WithAttributes(attribute.String("err", err.Error())))
|
||||
klog.Error(err)
|
||||
|
||||
@@ -480,7 +480,6 @@ func TestPodEvictorReset(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
nodes := []*v1.Node{node1, node2}
|
||||
|
||||
ownerRef1 := test.GetReplicaSetOwnerRefList()
|
||||
updatePod := func(pod *v1.Pod) {
|
||||
@@ -506,7 +505,7 @@ func TestPodEvictorReset(t *testing.T) {
|
||||
|
||||
// a single pod eviction expected
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 2 real evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
if err := descheduler.runDeschedulerLoop(ctx); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 2 || len(fakeEvictedPods) != 0 {
|
||||
@@ -515,7 +514,7 @@ func TestPodEvictorReset(t *testing.T) {
|
||||
|
||||
// a single pod eviction expected
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 4 real evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
if err := descheduler.runDeschedulerLoop(ctx); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 4 || len(fakeEvictedPods) != 0 {
|
||||
@@ -528,7 +527,7 @@ func TestPodEvictorReset(t *testing.T) {
|
||||
evictedPods = []string{}
|
||||
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 2 fake evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
if err := descheduler.runDeschedulerLoop(ctx); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 0 || len(fakeEvictedPods) != 2 {
|
||||
@@ -536,7 +535,7 @@ func TestPodEvictorReset(t *testing.T) {
|
||||
}
|
||||
|
||||
klog.Infof("2 pod eviction expected per a descheduling cycle, 4 fake evictions in total")
|
||||
if err := descheduler.runDeschedulerLoop(ctx, nodes); err != nil {
|
||||
if err := descheduler.runDeschedulerLoop(ctx); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
if descheduler.podEvictor.TotalEvicted() != 2 || len(evictedPods) != 0 || len(fakeEvictedPods) != 4 {
|
||||
@@ -555,7 +554,7 @@ func checkTotals(t *testing.T, ctx context.Context, descheduler *descheduler, to
|
||||
}
|
||||
|
||||
func runDeschedulingCycleAndCheckTotals(t *testing.T, ctx context.Context, nodes []*v1.Node, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) {
|
||||
err := descheduler.runDeschedulerLoop(ctx, nodes)
|
||||
err := descheduler.runDeschedulerLoop(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
@@ -731,7 +730,6 @@ func TestDeschedulingLimits(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
nodes := []*v1.Node{node1, node2}
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
featureGates := featuregate.NewFeatureGate()
|
||||
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
|
||||
@@ -774,7 +772,7 @@ func TestDeschedulingLimits(t *testing.T) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
klog.Infof("2 evictions in background expected, 2 normal evictions")
|
||||
err := descheduler.runDeschedulerLoop(ctx, nodes)
|
||||
err := descheduler.runDeschedulerLoop(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
@@ -790,6 +788,222 @@ func TestDeschedulingLimits(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeLabelSelectorBasedEviction(t *testing.T) {
|
||||
initPluginRegistry()
|
||||
|
||||
// createNodes creates 4 nodes with different labels and applies a taint to all of them
|
||||
createNodes := func() (*v1.Node, *v1.Node, *v1.Node, *v1.Node) {
|
||||
taint := []v1.Taint{
|
||||
{
|
||||
Key: "test-taint",
|
||||
Value: "test-value",
|
||||
Effect: v1.TaintEffectNoSchedule,
|
||||
},
|
||||
}
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-east-1a",
|
||||
"node-type": "compute",
|
||||
"environment": "production",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-east-1b",
|
||||
"node-type": "compute",
|
||||
"environment": "production",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
node3 := test.BuildTestNode("n3", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-west-1a",
|
||||
"node-type": "storage",
|
||||
"environment": "staging",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
node4 := test.BuildTestNode("n4", 2000, 3000, 10, func(node *v1.Node) {
|
||||
node.Labels = map[string]string{
|
||||
"zone": "us-west-1b",
|
||||
"node-type": "storage",
|
||||
"environment": "staging",
|
||||
}
|
||||
node.Spec.Taints = taint
|
||||
})
|
||||
return node1, node2, node3, node4
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
nodeSelector string
|
||||
dryRun bool
|
||||
expectedEvictedFromNodes []string
|
||||
}{
|
||||
{
|
||||
description: "Evict from n1, n2",
|
||||
nodeSelector: "environment=production",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n1, n2 in dry run mode",
|
||||
nodeSelector: "environment=production",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n3, n4",
|
||||
nodeSelector: "environment=staging",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n3", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n3, n4 in dry run mode",
|
||||
nodeSelector: "environment=staging",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n3", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n1, n4",
|
||||
nodeSelector: "zone in (us-east-1a, us-west-1b)",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n1", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n1, n4 in dry run mode",
|
||||
nodeSelector: "zone in (us-east-1a, us-west-1b)",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n1", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n2, n3",
|
||||
nodeSelector: "zone in (us-east-1b, us-west-1a)",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n2", "n3"},
|
||||
},
|
||||
{
|
||||
description: "Evict from n2, n3 in dry run mode",
|
||||
nodeSelector: "zone in (us-east-1b, us-west-1a)",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n2", "n3"},
|
||||
},
|
||||
{
|
||||
description: "Evict from all nodes",
|
||||
nodeSelector: "",
|
||||
dryRun: false,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2", "n3", "n4"},
|
||||
},
|
||||
{
|
||||
description: "Evict from all nodes in dry run mode",
|
||||
nodeSelector: "",
|
||||
dryRun: true,
|
||||
expectedEvictedFromNodes: []string{"n1", "n2", "n3", "n4"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create nodes with different labels and taints
|
||||
node1, node2, node3, node4 := createNodes()
|
||||
|
||||
ownerRef := test.GetReplicaSetOwnerRefList()
|
||||
updatePod := func(pod *v1.Pod) {
|
||||
pod.ObjectMeta.OwnerReferences = ownerRef
|
||||
pod.Status.Phase = v1.PodRunning
|
||||
}
|
||||
|
||||
// Create one pod per node
|
||||
p1 := test.BuildTestPod("p1", 200, 0, node1.Name, updatePod)
|
||||
p2 := test.BuildTestPod("p2", 200, 0, node2.Name, updatePod)
|
||||
p3 := test.BuildTestPod("p3", 200, 0, node3.Name, updatePod)
|
||||
p4 := test.BuildTestPod("p4", 200, 0, node4.Name, updatePod)
|
||||
|
||||
// Map pod names to their node names for validation
|
||||
podToNode := map[string]string{
|
||||
"p1": "n1",
|
||||
"p2": "n2",
|
||||
"p3": "n3",
|
||||
"p4": "n4",
|
||||
}
|
||||
|
||||
policy := removePodsViolatingNodeTaintsPolicy()
|
||||
if tc.nodeSelector != "" {
|
||||
policy.NodeSelector = &tc.nodeSelector
|
||||
}
|
||||
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
rs, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, node1, node2, node3, node4, p1, p2, p3, p4)
|
||||
defer cancel()
|
||||
|
||||
// Set dry run mode if specified
|
||||
rs.DryRun = tc.dryRun
|
||||
|
||||
// Verify all pods are created initially
|
||||
pods, err := client.CoreV1().Pods(p1.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to list pods: %v", err)
|
||||
}
|
||||
if len(pods.Items) != 4 {
|
||||
t.Errorf("Expected 4 pods initially, got %d", len(pods.Items))
|
||||
}
|
||||
|
||||
var evictedPods []string
|
||||
if !tc.dryRun {
|
||||
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil))
|
||||
} else {
|
||||
deschedulerInstance.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
|
||||
return podEvictionReactionTestingFnc(&evictedPods, nil, nil)
|
||||
}
|
||||
}
|
||||
|
||||
// Run descheduler
|
||||
if err := deschedulerInstance.runDeschedulerLoop(ctx); err != nil {
|
||||
t.Fatalf("Unable to run descheduler loop: %v", err)
|
||||
}
|
||||
|
||||
// Collect which nodes had pods evicted from them
|
||||
nodesWithEvictedPods := make(map[string]bool)
|
||||
for _, podName := range evictedPods {
|
||||
if nodeName, ok := podToNode[podName]; ok {
|
||||
nodesWithEvictedPods[nodeName] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the correct number of nodes had pods evicted
|
||||
if len(nodesWithEvictedPods) != len(tc.expectedEvictedFromNodes) {
|
||||
t.Errorf("Expected pods to be evicted from %d nodes, got %d nodes: %v", len(tc.expectedEvictedFromNodes), len(nodesWithEvictedPods), nodesWithEvictedPods)
|
||||
}
|
||||
|
||||
// Verify pods were evicted from the correct nodes
|
||||
for _, nodeName := range tc.expectedEvictedFromNodes {
|
||||
if !nodesWithEvictedPods[nodeName] {
|
||||
t.Errorf("Expected pod to be evicted from node %s, but it was not", nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify no unexpected nodes had pods evicted
|
||||
for nodeName := range nodesWithEvictedPods {
|
||||
found := false
|
||||
for _, expectedNode := range tc.expectedEvictedFromNodes {
|
||||
if nodeName == expectedNode {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Unexpected eviction from node %s", nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Successfully evicted pods from nodes: %v", tc.expectedEvictedFromNodes)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAwareDescheduling(t *testing.T) {
|
||||
initPluginRegistry()
|
||||
|
||||
@@ -801,7 +1015,6 @@ func TestLoadAwareDescheduling(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
|
||||
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
nodes := []*v1.Node{node1, node2}
|
||||
|
||||
p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod)
|
||||
p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod)
|
||||
@@ -857,7 +1070,7 @@ func TestLoadAwareDescheduling(t *testing.T) {
|
||||
// after newDescheduler in RunDeschedulerStrategies.
|
||||
descheduler.metricsCollector.Collect(ctx)
|
||||
|
||||
err := descheduler.runDeschedulerLoop(ctx, nodes)
|
||||
err := descheduler.runDeschedulerLoop(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
listersv1 "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
@@ -78,6 +79,22 @@ func ReadyNodes(ctx context.Context, client clientset.Interface, nodeLister list
|
||||
return readyNodes, nil
|
||||
}
|
||||
|
||||
// ReadyNodesFromInterfaces converts a list of interface{} items to ready nodes.
|
||||
// Each interface{} item is expected to be a *v1.Node. Only ready nodes are returned.
|
||||
func ReadyNodesFromInterfaces(nodeInterfaces []interface{}) ([]*v1.Node, error) {
|
||||
readyNodes := make([]*v1.Node, 0, len(nodeInterfaces))
|
||||
for i, nodeInterface := range nodeInterfaces {
|
||||
node, ok := nodeInterface.(*v1.Node)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("item at index %d is not a *v1.Node", i)
|
||||
}
|
||||
if IsReady(node) {
|
||||
readyNodes = append(readyNodes, node)
|
||||
}
|
||||
}
|
||||
return readyNodes, nil
|
||||
}
|
||||
|
||||
// IsReady checks if the descheduler could run against given node.
|
||||
func IsReady(node *v1.Node) bool {
|
||||
for i := range node.Status.Conditions {
|
||||
@@ -400,3 +417,22 @@ func podMatchesInterPodAntiAffinity(nodeIndexer podutil.GetPodsAssignedToNodeFun
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// BuildGetPodsAssignedToNodeFunc establishes an indexer to map the pods and their assigned nodes.
|
||||
// It returns a function to help us get all the pods that assigned to a node based on the indexer.
|
||||
func AddNodeSelectorIndexer(nodeInformer cache.SharedIndexInformer, indexerName string, nodeSelector labels.Selector) error {
|
||||
return nodeInformer.AddIndexers(cache.Indexers{
|
||||
indexerName: func(obj interface{}) ([]string, error) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
return []string{}, errors.New("unexpected object")
|
||||
}
|
||||
|
||||
if nodeSelector.Matches(labels.Set(node.Labels)) {
|
||||
return []string{indexerName}, nil
|
||||
}
|
||||
|
||||
return []string{}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -19,12 +19,16 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
@@ -86,6 +90,183 @@ func TestReadyNodesWithNodeSelector(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadyNodesFromInterfaces(t *testing.T) {
|
||||
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
|
||||
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
|
||||
node2.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}
|
||||
node3 := test.BuildTestNode("node3", 1000, 2000, 9, nil)
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
nodeInterfaces []interface{}
|
||||
expectedCount int
|
||||
expectedNames []string
|
||||
expectError bool
|
||||
errorContains string
|
||||
}{
|
||||
{
|
||||
description: "All nodes are ready",
|
||||
nodeInterfaces: []interface{}{node1, node3},
|
||||
expectedCount: 2,
|
||||
expectedNames: []string{"node1", "node3"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
description: "One node is not ready",
|
||||
nodeInterfaces: []interface{}{node1, node2, node3},
|
||||
expectedCount: 2,
|
||||
expectedNames: []string{"node1", "node3"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
description: "Empty list",
|
||||
nodeInterfaces: []interface{}{},
|
||||
expectedCount: 0,
|
||||
expectedNames: []string{},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
description: "Invalid type in list",
|
||||
nodeInterfaces: []interface{}{node1, "not a node", node3},
|
||||
expectedCount: 0,
|
||||
expectError: true,
|
||||
errorContains: "item at index 1 is not a *v1.Node",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
nodes, err := ReadyNodesFromInterfaces(tc.nodeInterfaces)
|
||||
|
||||
if tc.expectError {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error but got none")
|
||||
} else if tc.errorContains != "" && !strings.Contains(err.Error(), tc.errorContains) {
|
||||
t.Errorf("Expected error to contain '%s', got '%s'", tc.errorContains, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(nodes) != tc.expectedCount {
|
||||
t.Errorf("Expected %d nodes, got %d", tc.expectedCount, len(nodes))
|
||||
}
|
||||
|
||||
for i, expectedName := range tc.expectedNames {
|
||||
if i >= len(nodes) {
|
||||
t.Errorf("Missing node at index %d, expected %s", i, expectedName)
|
||||
continue
|
||||
}
|
||||
if nodes[i].Name != expectedName {
|
||||
t.Errorf("Expected node at index %d to be %s, got %s", i, expectedName, nodes[i].Name)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddNodeSelectorIndexer(t *testing.T) {
|
||||
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
|
||||
node1.Labels = map[string]string{"type": "compute", "zone": "us-east-1"}
|
||||
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
|
||||
node2.Labels = map[string]string{"type": "infra", "zone": "us-west-1"}
|
||||
node3 := test.BuildTestNode("node3", 1000, 2000, 9, nil)
|
||||
node3.Labels = map[string]string{"type": "compute", "zone": "us-west-1"}
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
indexerName string
|
||||
selectorString string
|
||||
expectedMatches []string
|
||||
}{
|
||||
{
|
||||
description: "Index nodes by type=compute",
|
||||
indexerName: "computeNodes",
|
||||
selectorString: "type=compute",
|
||||
expectedMatches: []string{"node1", "node3"},
|
||||
},
|
||||
{
|
||||
description: "Index nodes by type=infra",
|
||||
indexerName: "infraNodes",
|
||||
selectorString: "type=infra",
|
||||
expectedMatches: []string{"node2"},
|
||||
},
|
||||
{
|
||||
description: "Index nodes by zone=us-west-1",
|
||||
indexerName: "westZoneNodes",
|
||||
selectorString: "zone=us-west-1",
|
||||
expectedMatches: []string{"node2", "node3"},
|
||||
},
|
||||
{
|
||||
description: "Index nodes with multiple labels",
|
||||
indexerName: "computeEastNodes",
|
||||
selectorString: "type=compute,zone=us-east-1",
|
||||
expectedMatches: []string{"node1"},
|
||||
},
|
||||
{
|
||||
description: "No matching nodes",
|
||||
indexerName: "noMatchNodes",
|
||||
selectorString: "type=storage",
|
||||
expectedMatches: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
fakeClient := fake.NewSimpleClientset(node1, node2, node3)
|
||||
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
nodeInformer := sharedInformerFactory.Core().V1().Nodes().Informer()
|
||||
|
||||
selector, err := labels.Parse(tc.selectorString)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse selector: %v", err)
|
||||
}
|
||||
|
||||
err = AddNodeSelectorIndexer(nodeInformer, tc.indexerName, selector)
|
||||
if err != nil {
|
||||
t.Fatalf("AddNodeSelectorIndexer failed: %v", err)
|
||||
}
|
||||
|
||||
stopChannel := make(chan struct{})
|
||||
sharedInformerFactory.Start(stopChannel)
|
||||
sharedInformerFactory.WaitForCacheSync(stopChannel)
|
||||
defer close(stopChannel)
|
||||
|
||||
indexer := nodeInformer.GetIndexer()
|
||||
objs, err := indexer.ByIndex(tc.indexerName, tc.indexerName)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to query indexer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Extract node names from the results
|
||||
actualMatches := make([]string, 0, len(objs))
|
||||
for _, obj := range objs {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
t.Errorf("Expected *v1.Node, got %T", obj)
|
||||
continue
|
||||
}
|
||||
actualMatches = append(actualMatches, node.Name)
|
||||
}
|
||||
|
||||
// Sort both slices for consistent comparison
|
||||
sort.Strings(actualMatches)
|
||||
expectedMatches := make([]string, len(tc.expectedMatches))
|
||||
copy(expectedMatches, tc.expectedMatches)
|
||||
sort.Strings(expectedMatches)
|
||||
|
||||
// Compare using cmp.Diff
|
||||
if diff := cmp.Diff(expectedMatches, actualMatches); diff != "" {
|
||||
t.Errorf("Node matches mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsNodeUnschedulable(t *testing.T) {
|
||||
tests := []struct {
|
||||
description string
|
||||
|
||||
Reference in New Issue
Block a user