From d1c64c48cd7857a4f9616c47d764c253907b5d55 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Wed, 13 Nov 2024 15:24:13 +0100 Subject: [PATCH] nodeutilization: separate code responsible for requested resource extraction into a dedicated usage client Turning a usage client into an interface allows to implement other kinds of usage clients like actual usage or prometheus based resource collection. --- .../nodeutilization/highnodeutilization.go | 19 ++-- .../nodeutilization/lownodeutilization.go | 23 ++-- .../nodeutilization/nodeutilization.go | 76 +++++-------- .../plugins/nodeutilization/usageclients.go | 102 ++++++++++++++++++ 4 files changed, 148 insertions(+), 72 deletions(-) create mode 100644 pkg/framework/plugins/nodeutilization/usageclients.go diff --git a/pkg/framework/plugins/nodeutilization/highnodeutilization.go b/pkg/framework/plugins/nodeutilization/highnodeutilization.go index c07cedacd..dcfa44bdf 100644 --- a/pkg/framework/plugins/nodeutilization/highnodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/highnodeutilization.go @@ -44,6 +44,7 @@ type HighNodeUtilization struct { underutilizationCriteria []interface{} resourceNames []v1.ResourceName targetThresholds api.ResourceThresholds + usageClient usageClient } var _ frameworktypes.BalancePlugin = &HighNodeUtilization{} @@ -84,6 +85,7 @@ func NewHighNodeUtilization(args runtime.Object, handle frameworktypes.Handle) ( targetThresholds: targetThresholds, underutilizationCriteria: underutilizationCriteria, podFilter: podFilter, + usageClient: newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()), }, nil } @@ -94,22 +96,15 @@ func (h *HighNodeUtilization) Name() string { // Balance extension point implementation for the plugin func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { - nodeUsage, err := getNodeUsage(nodes, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc()) - if err != nil { + if err := h.usageClient.sync(nodes); err != nil { return &frameworktypes.Status{ Err: fmt.Errorf("error getting node usage: %v", err), } } - thresholds, err := getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc(), false) - if err != nil { - return &frameworktypes.Status{ - Err: fmt.Errorf("error getting node thresholds: %v", err), - } - } sourceNodes, highNodes := classifyNodes( - nodeUsage, - thresholds, + getNodeUsage(nodes, h.usageClient), + getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, false, h.usageClient), func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool { return isNodeWithLowUtilization(usage, threshold.lowResourceThreshold) }, @@ -165,7 +160,9 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr evictions.EvictOptions{StrategyName: HighNodeUtilizationPluginName}, h.podFilter, h.resourceNames, - continueEvictionCond) + continueEvictionCond, + h.usageClient, + ) return nil } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index 3337c750a..df3b84743 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -43,6 +43,7 @@ type LowNodeUtilization struct { underutilizationCriteria []interface{} overutilizationCriteria []interface{} resourceNames []v1.ResourceName + usageClient usageClient } var _ frameworktypes.BalancePlugin = &LowNodeUtilization{} @@ -85,13 +86,16 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f return nil, fmt.Errorf("error initializing pod filter function: %v", err) } + resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds) + return &LowNodeUtilization{ handle: handle, args: lowNodeUtilizationArgsArgs, underutilizationCriteria: underutilizationCriteria, overutilizationCriteria: overutilizationCriteria, - resourceNames: getResourceNames(lowNodeUtilizationArgsArgs.Thresholds), + resourceNames: resourceNames, podFilter: podFilter, + usageClient: newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()), }, nil } @@ -102,22 +106,15 @@ func (l *LowNodeUtilization) Name() string { // Balance extension point implementation for the plugin func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { - nodeUsage, err := getNodeUsage(nodes, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc()) - if err != nil { + if err := l.usageClient.sync(nodes); err != nil { return &frameworktypes.Status{ Err: fmt.Errorf("error getting node usage: %v", err), } } - thresholds, err := getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc(), l.args.UseDeviationThresholds) - if err != nil { - return &frameworktypes.Status{ - Err: fmt.Errorf("error getting node thresholds: %v", err), - } - } lowNodes, sourceNodes := classifyNodes( - nodeUsage, - thresholds, + getNodeUsage(nodes, l.usageClient), + getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.args.UseDeviationThresholds, l.usageClient), // The node has to be schedulable (to be able to move workload there) func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool { if nodeutil.IsNodeUnschedulable(node) { @@ -185,7 +182,9 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra evictions.EvictOptions{StrategyName: LowNodeUtilizationPluginName}, l.podFilter, l.resourceNames, - continueEvictionCond) + continueEvictionCond, + l.usageClient, + ) return nil } diff --git a/pkg/framework/plugins/nodeutilization/nodeutilization.go b/pkg/framework/plugins/nodeutilization/nodeutilization.go index 7f1fea60e..a694cacc8 100644 --- a/pkg/framework/plugins/nodeutilization/nodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/nodeutilization.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" - "sigs.k8s.io/descheduler/pkg/descheduler/node" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" @@ -78,18 +77,14 @@ func getNodeThresholds( nodes []*v1.Node, lowThreshold, highThreshold api.ResourceThresholds, resourceNames []v1.ResourceName, - getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, useDeviationThresholds bool, -) (map[string]NodeThresholds, error) { + usageClient usageClient, +) map[string]NodeThresholds { nodeThresholdsMap := map[string]NodeThresholds{} averageResourceUsagePercent := api.ResourceThresholds{} if useDeviationThresholds { - usage, err := averageNodeBasicresources(nodes, getPodsAssignedToNode, resourceNames) - if err != nil { - return nil, err - } - averageResourceUsagePercent = usage + averageResourceUsagePercent = averageNodeBasicresources(nodes, usageClient) } for _, node := range nodes { @@ -120,39 +115,24 @@ func getNodeThresholds( } } - return nodeThresholdsMap, nil + return nodeThresholdsMap } func getNodeUsage( nodes []*v1.Node, - resourceNames []v1.ResourceName, - getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, -) ([]NodeUsage, error) { + usageClient usageClient, +) []NodeUsage { var nodeUsageList []NodeUsage for _, node := range nodes { - pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil) - if err != nil { - klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) - continue - } - - nodeUsage, err := nodeutil.NodeUtilization(pods, resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) { - req, _ := utils.PodRequestsAndLimits(pod) - return req, nil - }) - if err != nil { - return nil, err - } - nodeUsageList = append(nodeUsageList, NodeUsage{ node: node, - usage: nodeUsage, - allPods: pods, + usage: usageClient.nodeUtilization(node.Name), + allPods: usageClient.pods(node.Name), }) } - return nodeUsageList, nil + return nodeUsageList } func resourceThreshold(nodeCapacity v1.ResourceList, resourceName v1.ResourceName, threshold api.Percentage) *resource.Quantity { @@ -239,7 +219,7 @@ func usageToKeysAndValues(usage map[v1.ResourceName]*resource.Quantity) []interf keysAndValues = append(keysAndValues, "Pods", quantity.Value()) } for name := range usage { - if !node.IsBasicResource(name) { + if !nodeutil.IsBasicResource(name) { keysAndValues = append(keysAndValues, string(name), usage[name].Value()) } } @@ -258,6 +238,7 @@ func evictPodsFromSourceNodes( podFilter func(pod *v1.Pod) bool, resourceNames []v1.ResourceName, continueEviction continueEvictionCond, + usageClient usageClient, ) { // upper bound on total number of pods/cpu/memory and optional extended resources to be moved totalAvailableUsage := map[v1.ResourceName]*resource.Quantity{} @@ -270,6 +251,10 @@ func evictPodsFromSourceNodes( taintsOfDestinationNodes[node.node.Name] = node.node.Spec.Taints for _, name := range resourceNames { + if _, exists := node.usage[name]; !exists { + klog.Errorf("unable to find %q resource in node's %q usage, terminating eviction", name, node.node.Name) + return + } if _, ok := totalAvailableUsage[name]; !ok { totalAvailableUsage[name] = resource.NewQuantity(0, resource.DecimalSI) } @@ -295,7 +280,7 @@ func evictPodsFromSourceNodes( klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers") // sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers. podutil.SortPodsBasedOnPriorityLowToHigh(removablePods) - err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction) + err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction, usageClient) if err != nil { switch err.(type) { case *evictions.EvictionTotalLimitError: @@ -316,6 +301,7 @@ func evictPods( podEvictor frameworktypes.Evictor, evictOptions evictions.EvictOptions, continueEviction continueEvictionCond, + usageClient usageClient, ) error { var excludedNamespaces sets.Set[string] if evictableNamespaces != nil { @@ -341,6 +327,11 @@ func evictPods( if !preEvictionFilterWithOptions(pod) { continue } + podUsage, err := usageClient.podUsage(pod) + if err != nil { + klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) + continue + } err = podEvictor.Evict(ctx, pod, evictOptions) if err == nil { klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod)) @@ -350,9 +341,8 @@ func evictPods( nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) } else { - quantity := utils.GetResourceRequestQuantity(pod, name) - nodeInfo.usage[name].Sub(quantity) - totalAvailableUsage[name].Sub(quantity) + nodeInfo.usage[name].Sub(*podUsage[name]) + totalAvailableUsage[name].Sub(*podUsage[name]) } } @@ -456,24 +446,12 @@ func classifyPods(pods []*v1.Pod, filter func(pod *v1.Pod) bool) ([]*v1.Pod, []* return nonRemovablePods, removablePods } -func averageNodeBasicresources(nodes []*v1.Node, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, resourceNames []v1.ResourceName) (api.ResourceThresholds, error) { +func averageNodeBasicresources(nodes []*v1.Node, usageClient usageClient) api.ResourceThresholds { total := api.ResourceThresholds{} average := api.ResourceThresholds{} numberOfNodes := len(nodes) for _, node := range nodes { - pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil) - if err != nil { - numberOfNodes-- - continue - } - usage, err := nodeutil.NodeUtilization(pods, resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) { - req, _ := utils.PodRequestsAndLimits(pod) - return req, nil - }) - if err != nil { - return nil, err - } - + usage := usageClient.nodeUtilization(node.Name) nodeCapacity := node.Status.Capacity if len(node.Status.Allocatable) > 0 { nodeCapacity = node.Status.Allocatable @@ -490,5 +468,5 @@ func averageNodeBasicresources(nodes []*v1.Node, getPodsAssignedToNode podutil.G for resource, value := range total { average[resource] = value / api.Percentage(numberOfNodes) } - return average, nil + return average } diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go new file mode 100644 index 000000000..608523897 --- /dev/null +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -0,0 +1,102 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeutilization + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + utilptr "k8s.io/utils/ptr" + nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" + podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/pkg/utils" +) + +type usageClient interface { + // Both low/high node utilization plugins are expected to invoke sync right + // after Balance method is invoked. There's no cache invalidation so each + // Balance is expected to get the latest data by invoking sync. + sync(nodes []*v1.Node) error + nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity + pods(node string) []*v1.Pod + podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) +} + +type requestedUsageClient struct { + resourceNames []v1.ResourceName + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + + _pods map[string][]*v1.Pod + _nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity +} + +var _ usageClient = &requestedUsageClient{} + +func newRequestedUsageClient( + resourceNames []v1.ResourceName, + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, +) *requestedUsageClient { + return &requestedUsageClient{ + resourceNames: resourceNames, + getPodsAssignedToNode: getPodsAssignedToNode, + } +} + +func (s *requestedUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity { + return s._nodeUtilization[node] +} + +func (s *requestedUsageClient) pods(node string) []*v1.Pod { + return s._pods[node] +} + +func (s *requestedUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + usage := make(map[v1.ResourceName]*resource.Quantity) + for _, resourceName := range s.resourceNames { + usage[resourceName] = utilptr.To[resource.Quantity](utils.GetResourceRequestQuantity(pod, resourceName).DeepCopy()) + } + return usage, nil +} + +func (s *requestedUsageClient) sync(nodes []*v1.Node) error { + s._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) + s._pods = make(map[string][]*v1.Pod) + + for _, node := range nodes { + pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil) + if err != nil { + klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) + return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + } + + nodeUsage, err := nodeutil.NodeUtilization(pods, s.resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) { + req, _ := utils.PodRequestsAndLimits(pod) + return req, nil + }) + if err != nil { + return err + } + + // store the snapshot of pods from the same (or the closest) node utilization computation + s._pods[node.Name] = pods + s._nodeUtilization[node.Name] = nodeUsage + } + + return nil +}