mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 05:14:13 +01:00
topologyspreadconstraint: refactor to match scheduler's struct and initialization
Signed-off-by: Amir Alavi <amiralavi7@gmail.com>
This commit is contained in:
@@ -29,7 +29,7 @@ import (
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
||||
v1helper "k8s.io/component-helpers/scheduling/corev1"
|
||||
nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
||||
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/node"
|
||||
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
|
||||
@@ -50,6 +50,19 @@ type topology struct {
|
||||
pods []*v1.Pod
|
||||
}
|
||||
|
||||
// topologySpreadConstraint is an internal version for v1.TopologySpreadConstraint
|
||||
// and where the selector is parsed.
|
||||
// This mirrors scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L37
|
||||
type topologySpreadConstraint struct {
|
||||
maxSkew int32
|
||||
topologyKey string
|
||||
selector labels.Selector
|
||||
nodeAffinityPolicy v1.NodeInclusionPolicy
|
||||
nodeTaintsPolicy v1.NodeInclusionPolicy
|
||||
podNodeAffinity nodeaffinity.RequiredNodeAffinity
|
||||
podTolerations []v1.Toleration
|
||||
}
|
||||
|
||||
// RemovePodsViolatingTopologySpreadConstraint evicts pods which violate their topology spread constraints
|
||||
type RemovePodsViolatingTopologySpreadConstraint struct {
|
||||
handle frameworktypes.Handle
|
||||
@@ -81,12 +94,6 @@ func New(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plug
|
||||
}, nil
|
||||
}
|
||||
|
||||
type topologyConstraintSet struct {
|
||||
constraint v1.TopologySpreadConstraint
|
||||
podNodeAffinity nodeaffinity.RequiredNodeAffinity
|
||||
podTolerations []v1.Toleration
|
||||
}
|
||||
|
||||
// Name retrieves the plugin name
|
||||
func (d *RemovePodsViolatingTopologySpreadConstraint) Name() string {
|
||||
return PluginName
|
||||
@@ -140,7 +147,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
}
|
||||
|
||||
// ...where there is a topology constraint
|
||||
namespaceTopologySpreadConstraints := []topologyConstraintSet{}
|
||||
var namespaceTopologySpreadConstraints []topologySpreadConstraint
|
||||
for _, pod := range namespacedPods[namespace] {
|
||||
for _, constraint := range pod.Spec.TopologySpreadConstraints {
|
||||
// Ignore topology constraints if they are not included
|
||||
@@ -148,10 +155,14 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
continue
|
||||
}
|
||||
|
||||
namespaceTopologySpreadConstraint := newTopologyConstraintSet(constraint, pod)
|
||||
namespaceTopologySpreadConstraint, err := newTopologySpreadConstraint(constraint, pod)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "cannot process topology spread constraint")
|
||||
continue
|
||||
}
|
||||
|
||||
// Need to check v1.TopologySpreadConstraint deepEquality because
|
||||
// v1.TopologySpreadConstraint has pointer fields
|
||||
// Need to check TopologySpreadConstraint deepEquality because
|
||||
// TopologySpreadConstraint can haves pointer fields
|
||||
// and we don't need to go over duplicated constraints later on
|
||||
if hasIdenticalConstraints(namespaceTopologySpreadConstraint, namespaceTopologySpreadConstraints) {
|
||||
continue
|
||||
@@ -164,27 +175,18 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
}
|
||||
|
||||
// 2. for each topologySpreadConstraint in that namespace
|
||||
for _, constraintSet := range namespaceTopologySpreadConstraints {
|
||||
constraint := constraintSet.constraint
|
||||
nodeAffinity := constraintSet.podNodeAffinity
|
||||
tolerations := constraintSet.podTolerations
|
||||
for _, tsc := range namespaceTopologySpreadConstraints {
|
||||
constraintTopologies := make(map[topologyPair][]*v1.Pod)
|
||||
// pre-populate the topologyPair map with all the topologies available from the nodeMap
|
||||
// (we can't just build it from existing pods' nodes because a topology may have 0 pods)
|
||||
for _, node := range nodeMap {
|
||||
if val, ok := node.Labels[constraint.TopologyKey]; ok {
|
||||
if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) {
|
||||
constraintTopologies[topologyPair{key: constraint.TopologyKey, value: val}] = make([]*v1.Pod, 0)
|
||||
if val, ok := node.Labels[tsc.topologyKey]; ok {
|
||||
if matchNodeInclusionPolicies(tsc, node) {
|
||||
constraintTopologies[topologyPair{key: tsc.topologyKey, value: val}] = make([]*v1.Pod, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Couldn't parse label selector as selector", "selector", constraint.LabelSelector)
|
||||
continue
|
||||
}
|
||||
|
||||
// 3. for each evictable pod in that namespace
|
||||
// (this loop is where we count the number of pods per topologyValue that match this constraint's selector)
|
||||
var sumPods float64
|
||||
@@ -194,7 +196,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
continue
|
||||
}
|
||||
// 4. if the pod matches this TopologySpreadConstraint LabelSelector
|
||||
if !selector.Matches(labels.Set(pod.Labels)) {
|
||||
if !tsc.selector.Matches(labels.Set(pod.Labels)) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -204,21 +206,21 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
// If ok is false, node is nil in which case node.Labels will panic. In which case a pod is yet to be scheduled. So it's safe to just continue here.
|
||||
continue
|
||||
}
|
||||
nodeValue, ok := node.Labels[constraint.TopologyKey]
|
||||
nodeValue, ok := node.Labels[tsc.topologyKey]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// 6. create a topoPair with key as this TopologySpreadConstraint
|
||||
topoPair := topologyPair{key: constraint.TopologyKey, value: nodeValue}
|
||||
topoPair := topologyPair{key: tsc.topologyKey, value: nodeValue}
|
||||
// 7. add the pod with key as this topoPair
|
||||
constraintTopologies[topoPair] = append(constraintTopologies[topoPair], pod)
|
||||
sumPods++
|
||||
}
|
||||
if topologyIsBalanced(constraintTopologies, constraint) {
|
||||
klog.V(2).InfoS("Skipping topology constraint because it is already balanced", "constraint", constraint)
|
||||
if topologyIsBalanced(constraintTopologies, tsc) {
|
||||
klog.V(2).InfoS("Skipping topology constraint because it is already balanced", "constraint", tsc)
|
||||
continue
|
||||
}
|
||||
d.balanceDomains(podsForEviction, constraintSet, constraintTopologies, sumPods, nodes)
|
||||
d.balanceDomains(podsForEviction, tsc, constraintTopologies, sumPods, nodes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,7 +245,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
|
||||
}
|
||||
|
||||
// hasIdenticalConstraints checks if we already had an identical TopologySpreadConstraint in namespaceTopologySpreadConstraints slice
|
||||
func hasIdenticalConstraints(newConstraint topologyConstraintSet, namespaceTopologySpreadConstraints []topologyConstraintSet) bool {
|
||||
func hasIdenticalConstraints(newConstraint topologySpreadConstraint, namespaceTopologySpreadConstraints []topologySpreadConstraint) bool {
|
||||
for _, constraint := range namespaceTopologySpreadConstraints {
|
||||
if reflect.DeepEqual(newConstraint, constraint) {
|
||||
return true
|
||||
@@ -254,7 +256,7 @@ func hasIdenticalConstraints(newConstraint topologyConstraintSet, namespaceTopol
|
||||
|
||||
// topologyIsBalanced checks if any domains in the topology differ by more than the MaxSkew
|
||||
// this is called before any sorting or other calculations and is used to skip topologies that don't need to be balanced
|
||||
func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.TopologySpreadConstraint) bool {
|
||||
func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, tsc topologySpreadConstraint) bool {
|
||||
minDomainSize := math.MaxInt32
|
||||
maxDomainSize := math.MinInt32
|
||||
for _, pods := range topology {
|
||||
@@ -264,7 +266,7 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.Topol
|
||||
if len(pods) > maxDomainSize {
|
||||
maxDomainSize = len(pods)
|
||||
}
|
||||
if int32(maxDomainSize-minDomainSize) > constraint.MaxSkew {
|
||||
if int32(maxDomainSize-minDomainSize) > tsc.maxSkew {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -293,20 +295,19 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.Topol
|
||||
// (assuming even distribution by the scheduler of the evicted pods)
|
||||
func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
|
||||
podsForEviction map[*v1.Pod]struct{},
|
||||
constraintSet topologyConstraintSet,
|
||||
tsc topologySpreadConstraint,
|
||||
constraintTopologies map[topologyPair][]*v1.Pod,
|
||||
sumPods float64,
|
||||
nodes []*v1.Node,
|
||||
) {
|
||||
constraint := constraintSet.constraint
|
||||
idealAvg := sumPods / float64(len(constraintTopologies))
|
||||
isEvictable := d.handle.Evictor().Filter
|
||||
sortedDomains := sortDomains(constraintTopologies, isEvictable)
|
||||
getPodsAssignedToNode := d.handle.GetPodsAssignedToNodeFunc()
|
||||
topologyBalanceNodeFit := utilpointer.BoolDeref(d.args.TopologyBalanceNodeFit, true)
|
||||
|
||||
eligibleNodes := filterEligibleNodes(nodes, constraintSet)
|
||||
nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, constraint.TopologyKey, idealAvg)
|
||||
eligibleNodes := filterEligibleNodes(nodes, tsc)
|
||||
nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, tsc.topologyKey, idealAvg)
|
||||
|
||||
// i is the index for belowOrEqualAvg
|
||||
// j is the index for aboveAvg
|
||||
@@ -322,7 +323,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
|
||||
skew := float64(len(sortedDomains[j].pods) - len(sortedDomains[i].pods))
|
||||
|
||||
// if k and j are within the maxSkew of each other, move to next belowOrEqualAvg
|
||||
if int32(skew) <= constraint.MaxSkew {
|
||||
if int32(skew) <= tsc.maxSkew {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
@@ -336,7 +337,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
|
||||
aboveAvg := math.Ceil(float64(len(sortedDomains[j].pods)) - idealAvg)
|
||||
belowAvg := math.Ceil(idealAvg - float64(len(sortedDomains[i].pods)))
|
||||
smallestDiff := math.Min(aboveAvg, belowAvg)
|
||||
halfSkew := math.Ceil((skew - float64(constraint.MaxSkew)) / 2)
|
||||
halfSkew := math.Ceil((skew - float64(tsc.maxSkew)) / 2)
|
||||
movePods := int(math.Min(smallestDiff, halfSkew))
|
||||
if movePods <= 0 {
|
||||
i++
|
||||
@@ -463,52 +464,82 @@ func doNotScheduleTaintsFilterFunc() func(t *v1.Taint) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func filterEligibleNodes(nodes []*v1.Node, constraintSet topologyConstraintSet) []*v1.Node {
|
||||
constraint := constraintSet.constraint
|
||||
nodeAffinity := constraintSet.podNodeAffinity
|
||||
tolerations := constraintSet.podTolerations
|
||||
func filterEligibleNodes(nodes []*v1.Node, tsc topologySpreadConstraint) []*v1.Node {
|
||||
var eligibleNodes []*v1.Node
|
||||
for _, node := range nodes {
|
||||
if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) {
|
||||
if matchNodeInclusionPolicies(tsc, node) {
|
||||
eligibleNodes = append(eligibleNodes, node)
|
||||
}
|
||||
}
|
||||
return eligibleNodes
|
||||
}
|
||||
|
||||
func matchNodeInclusionPolicies(tsc *v1.TopologySpreadConstraint, tolerations []v1.Toleration, node *v1.Node, require nodeaffinity.RequiredNodeAffinity) bool {
|
||||
// Nil is equivalent to honor
|
||||
if tsc.NodeAffinityPolicy == nil || *tsc.NodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
|
||||
func matchNodeInclusionPolicies(tsc topologySpreadConstraint, node *v1.Node) bool {
|
||||
if tsc.nodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
|
||||
// We ignore parsing errors here for backwards compatibility.
|
||||
if match, _ := require.Match(node); !match {
|
||||
if match, _ := tsc.podNodeAffinity.Match(node); !match {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Nil is equivalent to ignore
|
||||
if tsc.NodeTaintsPolicy != nil && *tsc.NodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
|
||||
if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tolerations, doNotScheduleTaintsFilterFunc()); untolerated {
|
||||
if tsc.nodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
|
||||
if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tsc.podTolerations, doNotScheduleTaintsFilterFunc()); untolerated {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func newTopologyConstraintSet(constraint v1.TopologySpreadConstraint, pod *v1.Pod) topologyConstraintSet {
|
||||
if pod.Labels != nil && len(constraint.MatchLabelKeys) > 0 {
|
||||
if constraint.LabelSelector == nil {
|
||||
constraint.LabelSelector = &metav1.LabelSelector{}
|
||||
}
|
||||
// inspired by Scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L90
|
||||
func newTopologySpreadConstraint(constraint v1.TopologySpreadConstraint, pod *v1.Pod) (topologySpreadConstraint, error) {
|
||||
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
|
||||
if err != nil {
|
||||
return topologySpreadConstraint{}, err
|
||||
}
|
||||
|
||||
if len(constraint.MatchLabelKeys) > 0 && pod.Labels != nil {
|
||||
matchLabels := make(labels.Set)
|
||||
for _, labelKey := range constraint.MatchLabelKeys {
|
||||
metav1.AddLabelToSelector(constraint.LabelSelector, labelKey, pod.Labels[labelKey])
|
||||
if value, ok := pod.Labels[labelKey]; ok {
|
||||
matchLabels[labelKey] = value
|
||||
}
|
||||
}
|
||||
if len(matchLabels) > 0 {
|
||||
selector = mergeLabelSetWithSelector(matchLabels, selector)
|
||||
}
|
||||
}
|
||||
|
||||
requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(pod)
|
||||
return topologyConstraintSet{
|
||||
constraint: constraint,
|
||||
podNodeAffinity: requiredSchedulingTerm,
|
||||
podTolerations: pod.Spec.Tolerations,
|
||||
tsc := topologySpreadConstraint{
|
||||
maxSkew: constraint.MaxSkew,
|
||||
topologyKey: constraint.TopologyKey,
|
||||
selector: selector,
|
||||
nodeAffinityPolicy: v1.NodeInclusionPolicyHonor, // If NodeAffinityPolicy is nil, we treat NodeAffinityPolicy as "Honor".
|
||||
nodeTaintsPolicy: v1.NodeInclusionPolicyIgnore, // If NodeTaintsPolicy is nil, we treat NodeTaintsPolicy as "Ignore".
|
||||
podNodeAffinity: nodeaffinity.GetRequiredNodeAffinity(pod),
|
||||
podTolerations: pod.Spec.Tolerations,
|
||||
}
|
||||
if constraint.NodeAffinityPolicy != nil {
|
||||
tsc.nodeAffinityPolicy = *constraint.NodeAffinityPolicy
|
||||
}
|
||||
if constraint.NodeTaintsPolicy != nil {
|
||||
tsc.nodeTaintsPolicy = *constraint.NodeTaintsPolicy
|
||||
}
|
||||
|
||||
return tsc, nil
|
||||
}
|
||||
|
||||
// Scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L136
|
||||
func mergeLabelSetWithSelector(matchLabels labels.Set, s labels.Selector) labels.Selector {
|
||||
mergedSelector := labels.SelectorFromSet(matchLabels)
|
||||
|
||||
requirements, ok := s.Requirements()
|
||||
if !ok {
|
||||
return s
|
||||
}
|
||||
|
||||
for _, r := range requirements {
|
||||
mergedSelector = mergedSelector.Add(r)
|
||||
}
|
||||
|
||||
return mergedSelector
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
@@ -1606,30 +1605,29 @@ func getDefaultTopologyConstraintsWithPodTemplateHashMatch(maxSkew int32) []v1.T
|
||||
}
|
||||
|
||||
func TestCheckIdenticalConstraints(t *testing.T) {
|
||||
newConstraintSame := v1.TopologySpreadConstraint{
|
||||
MaxSkew: 2,
|
||||
TopologyKey: "zone",
|
||||
WhenUnsatisfiable: v1.DoNotSchedule,
|
||||
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
|
||||
selector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}})
|
||||
|
||||
newConstraintSame := topologySpreadConstraint{
|
||||
maxSkew: 2,
|
||||
topologyKey: "zone",
|
||||
selector: selector.DeepCopySelector(),
|
||||
}
|
||||
newConstraintDifferent := v1.TopologySpreadConstraint{
|
||||
MaxSkew: 3,
|
||||
TopologyKey: "node",
|
||||
WhenUnsatisfiable: v1.DoNotSchedule,
|
||||
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
|
||||
newConstraintDifferent := topologySpreadConstraint{
|
||||
maxSkew: 3,
|
||||
topologyKey: "node",
|
||||
selector: selector.DeepCopySelector(),
|
||||
}
|
||||
namespaceTopologySpreadConstraint := []v1.TopologySpreadConstraint{
|
||||
namespaceTopologySpreadConstraint := []topologySpreadConstraint{
|
||||
{
|
||||
MaxSkew: 2,
|
||||
TopologyKey: "zone",
|
||||
WhenUnsatisfiable: v1.DoNotSchedule,
|
||||
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
|
||||
maxSkew: 2,
|
||||
topologyKey: "zone",
|
||||
selector: selector.DeepCopySelector(),
|
||||
},
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
namespaceTopologySpreadConstraints []v1.TopologySpreadConstraint
|
||||
newConstraint v1.TopologySpreadConstraint
|
||||
namespaceTopologySpreadConstraints []topologySpreadConstraint
|
||||
newConstraint topologySpreadConstraint
|
||||
expectedResult bool
|
||||
}{
|
||||
{
|
||||
@@ -1647,19 +1645,7 @@ func TestCheckIdenticalConstraints(t *testing.T) {
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var constraintSets []topologyConstraintSet
|
||||
for _, constraints := range tc.namespaceTopologySpreadConstraints {
|
||||
constraintSets = append(constraintSets, topologyConstraintSet{
|
||||
constraint: constraints,
|
||||
podNodeAffinity: nodeaffinity.RequiredNodeAffinity{},
|
||||
podTolerations: []v1.Toleration{},
|
||||
})
|
||||
}
|
||||
isIdentical := hasIdenticalConstraints(topologyConstraintSet{
|
||||
constraint: tc.newConstraint,
|
||||
podNodeAffinity: nodeaffinity.RequiredNodeAffinity{},
|
||||
podTolerations: []v1.Toleration{},
|
||||
}, constraintSets)
|
||||
isIdentical := hasIdenticalConstraints(tc.newConstraint, tc.namespaceTopologySpreadConstraints)
|
||||
if isIdentical != tc.expectedResult {
|
||||
t.Errorf("Test error for description: %s. Expected result %v, got %v", tc.name, tc.expectedResult, isIdentical)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user