1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

nodeutilization: separate code responsible for requested resource extraction into a dedicated usage client

Turning a usage client into an interface allows to implement other kinds
of usage clients like actual usage or prometheus based resource
collection.
This commit is contained in:
Jan Chaloupka
2024-11-13 15:24:13 +01:00
parent 7b1178be9f
commit d1c64c48cd
4 changed files with 148 additions and 72 deletions

View File

@@ -44,6 +44,7 @@ type HighNodeUtilization struct {
underutilizationCriteria []interface{}
resourceNames []v1.ResourceName
targetThresholds api.ResourceThresholds
usageClient usageClient
}
var _ frameworktypes.BalancePlugin = &HighNodeUtilization{}
@@ -84,6 +85,7 @@ func NewHighNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (
targetThresholds: targetThresholds,
underutilizationCriteria: underutilizationCriteria,
podFilter: podFilter,
usageClient: newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()),
}, nil
}
@@ -94,22 +96,15 @@ func (h *HighNodeUtilization) Name() string {
// Balance extension point implementation for the plugin
func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
nodeUsage, err := getNodeUsage(nodes, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc())
if err != nil {
if err := h.usageClient.sync(nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
}
thresholds, err := getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc(), false)
if err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node thresholds: %v", err),
}
}
sourceNodes, highNodes := classifyNodes(
nodeUsage,
thresholds,
getNodeUsage(nodes, h.usageClient),
getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, false, h.usageClient),
func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool {
return isNodeWithLowUtilization(usage, threshold.lowResourceThreshold)
},
@@ -165,7 +160,9 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
evictions.EvictOptions{StrategyName: HighNodeUtilizationPluginName},
h.podFilter,
h.resourceNames,
continueEvictionCond)
continueEvictionCond,
h.usageClient,
)
return nil
}

View File

@@ -43,6 +43,7 @@ type LowNodeUtilization struct {
underutilizationCriteria []interface{}
overutilizationCriteria []interface{}
resourceNames []v1.ResourceName
usageClient usageClient
}
var _ frameworktypes.BalancePlugin = &LowNodeUtilization{}
@@ -85,13 +86,16 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f
return nil, fmt.Errorf("error initializing pod filter function: %v", err)
}
resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)
return &LowNodeUtilization{
handle: handle,
args: lowNodeUtilizationArgsArgs,
underutilizationCriteria: underutilizationCriteria,
overutilizationCriteria: overutilizationCriteria,
resourceNames: getResourceNames(lowNodeUtilizationArgsArgs.Thresholds),
resourceNames: resourceNames,
podFilter: podFilter,
usageClient: newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()),
}, nil
}
@@ -102,22 +106,15 @@ func (l *LowNodeUtilization) Name() string {
// Balance extension point implementation for the plugin
func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
nodeUsage, err := getNodeUsage(nodes, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc())
if err != nil {
if err := l.usageClient.sync(nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
}
thresholds, err := getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc(), l.args.UseDeviationThresholds)
if err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node thresholds: %v", err),
}
}
lowNodes, sourceNodes := classifyNodes(
nodeUsage,
thresholds,
getNodeUsage(nodes, l.usageClient),
getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.args.UseDeviationThresholds, l.usageClient),
// The node has to be schedulable (to be able to move workload there)
func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool {
if nodeutil.IsNodeUnschedulable(node) {
@@ -185,7 +182,9 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
evictions.EvictOptions{StrategyName: LowNodeUtilizationPluginName},
l.podFilter,
l.resourceNames,
continueEvictionCond)
continueEvictionCond,
l.usageClient,
)
return nil
}

View File

@@ -28,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/node"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
@@ -78,18 +77,14 @@ func getNodeThresholds(
nodes []*v1.Node,
lowThreshold, highThreshold api.ResourceThresholds,
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
useDeviationThresholds bool,
) (map[string]NodeThresholds, error) {
usageClient usageClient,
) map[string]NodeThresholds {
nodeThresholdsMap := map[string]NodeThresholds{}
averageResourceUsagePercent := api.ResourceThresholds{}
if useDeviationThresholds {
usage, err := averageNodeBasicresources(nodes, getPodsAssignedToNode, resourceNames)
if err != nil {
return nil, err
}
averageResourceUsagePercent = usage
averageResourceUsagePercent = averageNodeBasicresources(nodes, usageClient)
}
for _, node := range nodes {
@@ -120,39 +115,24 @@ func getNodeThresholds(
}
}
return nodeThresholdsMap, nil
return nodeThresholdsMap
}
func getNodeUsage(
nodes []*v1.Node,
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
) ([]NodeUsage, error) {
usageClient usageClient,
) []NodeUsage {
var nodeUsageList []NodeUsage
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}
nodeUsage, err := nodeutil.NodeUtilization(pods, resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
})
if err != nil {
return nil, err
}
nodeUsageList = append(nodeUsageList, NodeUsage{
node: node,
usage: nodeUsage,
allPods: pods,
usage: usageClient.nodeUtilization(node.Name),
allPods: usageClient.pods(node.Name),
})
}
return nodeUsageList, nil
return nodeUsageList
}
func resourceThreshold(nodeCapacity v1.ResourceList, resourceName v1.ResourceName, threshold api.Percentage) *resource.Quantity {
@@ -239,7 +219,7 @@ func usageToKeysAndValues(usage map[v1.ResourceName]*resource.Quantity) []interf
keysAndValues = append(keysAndValues, "Pods", quantity.Value())
}
for name := range usage {
if !node.IsBasicResource(name) {
if !nodeutil.IsBasicResource(name) {
keysAndValues = append(keysAndValues, string(name), usage[name].Value())
}
}
@@ -258,6 +238,7 @@ func evictPodsFromSourceNodes(
podFilter func(pod *v1.Pod) bool,
resourceNames []v1.ResourceName,
continueEviction continueEvictionCond,
usageClient usageClient,
) {
// upper bound on total number of pods/cpu/memory and optional extended resources to be moved
totalAvailableUsage := map[v1.ResourceName]*resource.Quantity{}
@@ -270,6 +251,10 @@ func evictPodsFromSourceNodes(
taintsOfDestinationNodes[node.node.Name] = node.node.Spec.Taints
for _, name := range resourceNames {
if _, exists := node.usage[name]; !exists {
klog.Errorf("unable to find %q resource in node's %q usage, terminating eviction", name, node.node.Name)
return
}
if _, ok := totalAvailableUsage[name]; !ok {
totalAvailableUsage[name] = resource.NewQuantity(0, resource.DecimalSI)
}
@@ -295,7 +280,7 @@ func evictPodsFromSourceNodes(
klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers")
// sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers.
podutil.SortPodsBasedOnPriorityLowToHigh(removablePods)
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction)
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction, usageClient)
if err != nil {
switch err.(type) {
case *evictions.EvictionTotalLimitError:
@@ -316,6 +301,7 @@ func evictPods(
podEvictor frameworktypes.Evictor,
evictOptions evictions.EvictOptions,
continueEviction continueEvictionCond,
usageClient usageClient,
) error {
var excludedNamespaces sets.Set[string]
if evictableNamespaces != nil {
@@ -341,6 +327,11 @@ func evictPods(
if !preEvictionFilterWithOptions(pod) {
continue
}
podUsage, err := usageClient.podUsage(pod)
if err != nil {
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
continue
}
err = podEvictor.Evict(ctx, pod, evictOptions)
if err == nil {
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))
@@ -350,9 +341,8 @@ func evictPods(
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
} else {
quantity := utils.GetResourceRequestQuantity(pod, name)
nodeInfo.usage[name].Sub(quantity)
totalAvailableUsage[name].Sub(quantity)
nodeInfo.usage[name].Sub(*podUsage[name])
totalAvailableUsage[name].Sub(*podUsage[name])
}
}
@@ -456,24 +446,12 @@ func classifyPods(pods []*v1.Pod, filter func(pod *v1.Pod) bool) ([]*v1.Pod, []*
return nonRemovablePods, removablePods
}
func averageNodeBasicresources(nodes []*v1.Node, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, resourceNames []v1.ResourceName) (api.ResourceThresholds, error) {
func averageNodeBasicresources(nodes []*v1.Node, usageClient usageClient) api.ResourceThresholds {
total := api.ResourceThresholds{}
average := api.ResourceThresholds{}
numberOfNodes := len(nodes)
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil)
if err != nil {
numberOfNodes--
continue
}
usage, err := nodeutil.NodeUtilization(pods, resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
})
if err != nil {
return nil, err
}
usage := usageClient.nodeUtilization(node.Name)
nodeCapacity := node.Status.Capacity
if len(node.Status.Allocatable) > 0 {
nodeCapacity = node.Status.Allocatable
@@ -490,5 +468,5 @@ func averageNodeBasicresources(nodes []*v1.Node, getPodsAssignedToNode podutil.G
for resource, value := range total {
average[resource] = value / api.Percentage(numberOfNodes)
}
return average, nil
return average
}

View File

@@ -0,0 +1,102 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeutilization
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
utilptr "k8s.io/utils/ptr"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/utils"
)
type usageClient interface {
// Both low/high node utilization plugins are expected to invoke sync right
// after Balance method is invoked. There's no cache invalidation so each
// Balance is expected to get the latest data by invoking sync.
sync(nodes []*v1.Node) error
nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity
pods(node string) []*v1.Pod
podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error)
}
type requestedUsageClient struct {
resourceNames []v1.ResourceName
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
_pods map[string][]*v1.Pod
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
}
var _ usageClient = &requestedUsageClient{}
func newRequestedUsageClient(
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
) *requestedUsageClient {
return &requestedUsageClient{
resourceNames: resourceNames,
getPodsAssignedToNode: getPodsAssignedToNode,
}
}
func (s *requestedUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
return s._nodeUtilization[node]
}
func (s *requestedUsageClient) pods(node string) []*v1.Pod {
return s._pods[node]
}
func (s *requestedUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
usage := make(map[v1.ResourceName]*resource.Quantity)
for _, resourceName := range s.resourceNames {
usage[resourceName] = utilptr.To[resource.Quantity](utils.GetResourceRequestQuantity(pod, resourceName).DeepCopy())
}
return usage, nil
}
func (s *requestedUsageClient) sync(nodes []*v1.Node) error {
s._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
s._pods = make(map[string][]*v1.Pod)
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
}
nodeUsage, err := nodeutil.NodeUtilization(pods, s.resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
})
if err != nil {
return err
}
// store the snapshot of pods from the same (or the closest) node utilization computation
s._pods[node.Name] = pods
s._nodeUtilization[node.Name] = nodeUsage
}
return nil
}