mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 05:14:13 +01:00
use pod informers for listing pods in removepodsviolatingtopologyspreadconstraint and removepodsviolatinginterpodantiaffinity (#1163)
* use pod informers for listing pods in removepodsviolatingtopologyspreadconstraint and removepodsviolatinginterpodantiaffinity Signed-off-by: Amir Alavi <amiralavi7@gmail.com> * workaround in topologyspreadconstraint test to ensure that informer's index returns pods sorted by name --------- Signed-off-by: Amir Alavi <amiralavi7@gmail.com>
This commit is contained in:
@@ -157,6 +157,20 @@ func BuildGetPodsAssignedToNodeFunc(podInformer cache.SharedIndexInformer) (GetP
|
||||
return getPodsAssignedToNode, nil
|
||||
}
|
||||
|
||||
// ListPodsOnNodes returns all pods on given nodes.
|
||||
func ListPodsOnNodes(nodes []*v1.Node, getPodsAssignedToNode GetPodsAssignedToNodeFunc, filter FilterFunc) ([]*v1.Pod, error) {
|
||||
pods := make([]*v1.Pod, 0)
|
||||
for _, node := range nodes {
|
||||
podsOnNode, err := ListPodsOnANode(node.Name, getPodsAssignedToNode, filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pods = append(pods, podsOnNode...)
|
||||
}
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
// ListPodsOnANode lists all pods on a node.
|
||||
// It also accepts a "filter" function which can be used to further limit the pods that are returned.
|
||||
// (Usually this is podEvictor.Evictable().IsEvictable, in order to only list the evictable pods on a node, but can
|
||||
@@ -187,6 +201,15 @@ func ListAllPodsOnANode(
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func GroupByNamespace(pods []*v1.Pod) map[string][]*v1.Pod {
|
||||
m := make(map[string][]*v1.Pod)
|
||||
for i := 0; i < len(pods); i++ {
|
||||
pod := pods[i]
|
||||
m[pod.Namespace] = append(m[pod.Namespace], pod)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// OwnerRef returns the ownerRefList for the pod.
|
||||
func OwnerRef(pod *v1.Pod) []metav1.OwnerReference {
|
||||
return pod.ObjectMeta.GetOwnerReferences()
|
||||
|
||||
@@ -78,20 +78,15 @@ func (d *RemovePodsViolatingInterPodAntiAffinity) Name() string {
|
||||
}
|
||||
|
||||
func (d *RemovePodsViolatingInterPodAntiAffinity) Deschedule(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
|
||||
podsList, err := d.handle.ClientSet().CoreV1().Pods("").List(ctx, metav1.ListOptions{})
|
||||
pods, err := podutil.ListPodsOnNodes(nodes, d.handle.GetPodsAssignedToNodeFunc(), d.podFilter)
|
||||
if err != nil {
|
||||
return &frameworktypes.Status{
|
||||
Err: fmt.Errorf("error listing all pods: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
podsInANamespace := groupByNamespace(podsList)
|
||||
|
||||
runningPodFilter := func(pod *v1.Pod) bool {
|
||||
return pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed
|
||||
}
|
||||
podsOnANode := groupByNodeName(podsList, podutil.WrapFilterFuncs(runningPodFilter, d.podFilter))
|
||||
|
||||
podsInANamespace := podutil.GroupByNamespace(pods)
|
||||
podsOnANode := groupByNodeName(pods)
|
||||
nodeMap := createNodeMap(nodes)
|
||||
|
||||
loop:
|
||||
@@ -136,25 +131,11 @@ func removePodFromNamespaceMap(podToRemove *v1.Pod, podMap map[string][]*v1.Pod)
|
||||
return podMap
|
||||
}
|
||||
|
||||
func groupByNamespace(pods *v1.PodList) map[string][]*v1.Pod {
|
||||
func groupByNodeName(pods []*v1.Pod) map[string][]*v1.Pod {
|
||||
m := make(map[string][]*v1.Pod)
|
||||
for i := 0; i < len(pods.Items); i++ {
|
||||
pod := &(pods.Items[i])
|
||||
m[pod.Namespace] = append(m[pod.Namespace], pod)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func groupByNodeName(pods *v1.PodList, filter podutil.FilterFunc) map[string][]*v1.Pod {
|
||||
m := make(map[string][]*v1.Pod)
|
||||
if filter == nil {
|
||||
filter = func(p *v1.Pod) bool { return true }
|
||||
}
|
||||
for i := 0; i < len(pods.Items); i++ {
|
||||
pod := &(pods.Items[i])
|
||||
if filter(pod) {
|
||||
m[pod.Spec.NodeName] = append(m[pod.Spec.NodeName], pod)
|
||||
}
|
||||
for i := 0; i < len(pods); i++ {
|
||||
pod := pods[i]
|
||||
m[pod.Spec.NodeName] = append(m[pod.Spec.NodeName], pod)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
@@ -102,15 +102,6 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
// iterate through all topoPairs for this topologyKey and diff currentPods -minPods <=maxSkew
|
||||
// if diff > maxSkew, add this pod in the current bucket for eviction
|
||||
|
||||
// First record all of the constraints by namespace
|
||||
client := d.handle.ClientSet()
|
||||
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Couldn't list namespaces")
|
||||
return &frameworktypes.Status{
|
||||
Err: fmt.Errorf("list namespace: %w", err),
|
||||
}
|
||||
}
|
||||
klog.V(1).InfoS("Processing namespaces for topology spread constraints")
|
||||
podsForEviction := make(map[*v1.Pod]struct{})
|
||||
var includedNamespaces, excludedNamespaces sets.Set[string]
|
||||
@@ -119,21 +110,25 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
excludedNamespaces = sets.New(d.args.Namespaces.Exclude...)
|
||||
}
|
||||
|
||||
// 1. for each namespace...
|
||||
for _, namespace := range namespaces.Items {
|
||||
if (len(includedNamespaces) > 0 && !includedNamespaces.Has(namespace.Name)) ||
|
||||
(len(excludedNamespaces) > 0 && excludedNamespaces.Has(namespace.Name)) {
|
||||
continue
|
||||
pods, err := podutil.ListPodsOnNodes(nodes, d.handle.GetPodsAssignedToNodeFunc(), d.podFilter)
|
||||
if err != nil {
|
||||
return &frameworktypes.Status{
|
||||
Err: fmt.Errorf("error listing all pods: %v", err),
|
||||
}
|
||||
namespacePods, err := client.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Couldn't list pods in namespace", "namespace", namespace)
|
||||
}
|
||||
|
||||
namespacedPods := podutil.GroupByNamespace(pods)
|
||||
|
||||
// 1. for each namespace...
|
||||
for namespace := range namespacedPods {
|
||||
if (len(includedNamespaces) > 0 && !includedNamespaces.Has(namespace)) ||
|
||||
(len(excludedNamespaces) > 0 && excludedNamespaces.Has(namespace)) {
|
||||
continue
|
||||
}
|
||||
|
||||
// ...where there is a topology constraint
|
||||
namespaceTopologySpreadConstraints := []v1.TopologySpreadConstraint{}
|
||||
for _, pod := range namespacePods.Items {
|
||||
for _, pod := range namespacedPods[namespace] {
|
||||
for _, constraint := range pod.Spec.TopologySpreadConstraints {
|
||||
// Ignore soft topology constraints if they are not included
|
||||
if constraint.WhenUnsatisfiable == v1.ScheduleAnyway && (d.args == nil || !d.args.IncludeSoftConstraints) {
|
||||
@@ -172,18 +167,18 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
// 3. for each evictable pod in that namespace
|
||||
// (this loop is where we count the number of pods per topologyValue that match this constraint's selector)
|
||||
var sumPods float64
|
||||
for i := range namespacePods.Items {
|
||||
for _, pod := range namespacedPods[namespace] {
|
||||
// skip pods that are being deleted.
|
||||
if utils.IsPodTerminating(&namespacePods.Items[i]) {
|
||||
if utils.IsPodTerminating(pod) {
|
||||
continue
|
||||
}
|
||||
// 4. if the pod matches this TopologySpreadConstraint LabelSelector
|
||||
if !selector.Matches(labels.Set(namespacePods.Items[i].Labels)) {
|
||||
if !selector.Matches(labels.Set(pod.Labels)) {
|
||||
continue
|
||||
}
|
||||
|
||||
// 5. If the pod's node matches this constraint's topologyKey, create a topoPair and add the pod
|
||||
node, ok := nodeMap[namespacePods.Items[i].Spec.NodeName]
|
||||
node, ok := nodeMap[pod.Spec.NodeName]
|
||||
if !ok {
|
||||
// If ok is false, node is nil in which case node.Labels will panic. In which case a pod is yet to be scheduled. So it's safe to just continue here.
|
||||
continue
|
||||
@@ -195,7 +190,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
// 6. create a topoPair with key as this TopologySpreadConstraint
|
||||
topoPair := topologyPair{key: constraint.TopologyKey, value: nodeValue}
|
||||
// 7. add the pod with key as this topoPair
|
||||
constraintTopologies[topoPair] = append(constraintTopologies[topoPair], &namespacePods.Items[i])
|
||||
constraintTopologies[topoPair] = append(constraintTopologies[topoPair], pod)
|
||||
sumPods++
|
||||
}
|
||||
if topologyIsBalanced(constraintTopologies, constraint) {
|
||||
|
||||
@@ -3,6 +3,7 @@ package removepodsviolatingtopologyspreadconstraint
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@@ -611,7 +612,7 @@ func TestTopologySpreadConstraint(t *testing.T) {
|
||||
},
|
||||
}),
|
||||
expectedEvictedCount: 5,
|
||||
expectedEvictedPods: []string{"pod-5", "pod-6", "pod-7", "pod-8"},
|
||||
expectedEvictedPods: []string{"pod-5", "pod-6", "pod-7", "pod-8", "pod-9"},
|
||||
namespaces: []string{"ns1"},
|
||||
args: RemovePodsViolatingTopologySpreadConstraintArgs{},
|
||||
},
|
||||
@@ -1137,11 +1138,21 @@ func TestTopologySpreadConstraint(t *testing.T) {
|
||||
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
|
||||
|
||||
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
|
||||
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
|
||||
if err != nil {
|
||||
t.Errorf("Build get pods assigned to node function error: %v", err)
|
||||
}
|
||||
|
||||
// workaround to ensure that pods are returned sorted so 'expectedEvictedPods' would work consistently
|
||||
getPodsAssignedToNode := func(s string, filterFunc podutil.FilterFunc) ([]*v1.Pod, error) {
|
||||
pods, err := podsAssignedToNode(s, filterFunc)
|
||||
sort.Slice(pods, func(i, j int) bool {
|
||||
return pods[i].Name < pods[j].Name
|
||||
})
|
||||
|
||||
return pods, err
|
||||
}
|
||||
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user