From 646a383b37f40cc8c910360cb2bd4578920adb83 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Tue, 5 Nov 2024 14:07:59 +0100 Subject: [PATCH] Get pod usage from the usage client --- .../metricscollector/metricscollector.go | 4 + .../nodeutilization/nodeutilization.go | 17 ++-- .../plugins/nodeutilization/usageclients.go | 58 ++++++++++--- .../nodeutilization/usageclients_test.go | 1 - .../metrics/v1beta1/fake/fake_nodemetrics.go | 77 ++++++++++++++++++ .../metrics/v1beta1/fake/fake_podmetrics.go | 81 +++++++++++++++++++ 6 files changed, 221 insertions(+), 17 deletions(-) create mode 100644 vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_nodemetrics.go create mode 100644 vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_podmetrics.go diff --git a/pkg/descheduler/metricscollector/metricscollector.go b/pkg/descheduler/metricscollector/metricscollector.go index abaa1b71c..07e3f5dea 100644 --- a/pkg/descheduler/metricscollector/metricscollector.go +++ b/pkg/descheduler/metricscollector/metricscollector.go @@ -79,6 +79,10 @@ func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resou }, nil } +func (mc *MetricsCollector) MetricsClient() metricsclient.Interface { + return mc.metricsClientset +} + func (mc *MetricsCollector) Collect(ctx context.Context) error { mc.mu.Lock() defer mc.mu.Unlock() diff --git a/pkg/framework/plugins/nodeutilization/nodeutilization.go b/pkg/framework/plugins/nodeutilization/nodeutilization.go index 7dd3e5e2d..b3a3afcde 100644 --- a/pkg/framework/plugins/nodeutilization/nodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/nodeutilization.go @@ -18,6 +18,7 @@ package nodeutilization import ( "context" + "fmt" "math" "sort" @@ -219,6 +220,7 @@ func evictPodsFromSourceNodes( podFilter func(pod *v1.Pod) bool, resourceNames []v1.ResourceName, continueEviction continueEvictionCond, + usageSnapshot usageClient, ) { // upper bound on total number of pods/cpu/memory and optional extended resources to be moved totalAvailableUsage := map[v1.ResourceName]*resource.Quantity{ @@ -267,7 +269,7 @@ func evictPodsFromSourceNodes( klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers") // sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers. podutil.SortPodsBasedOnPriorityLowToHigh(removablePods) - err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction) + err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction, usageSnapshot) if err != nil { switch err.(type) { case *evictions.EvictionTotalLimitError: @@ -288,6 +290,7 @@ func evictPods( podEvictor frameworktypes.Evictor, evictOptions evictions.EvictOptions, continueEviction continueEvictionCond, + usageSnapshot usageClient, ) error { var excludedNamespaces sets.Set[string] if evictableNamespaces != nil { @@ -296,6 +299,7 @@ func evictPods( if continueEviction(nodeInfo, totalAvailableUsage) { for _, pod := range inputPods { + fmt.Printf("pods: %v\n", pod.Name) if !utils.PodToleratesTaints(pod, taintsOfLowNodes) { klog.V(3).InfoS("Skipping eviction for pod, doesn't tolerate node taint", "pod", klog.KObj(pod)) continue @@ -313,18 +317,21 @@ func evictPods( if !preEvictionFilterWithOptions(pod) { continue } + podUsage, err := usageSnapshot.podUsage(pod) + if err != nil { + klog.ErrorS(err, "unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) + continue + } err = podEvictor.Evict(ctx, pod, evictOptions) if err == nil { klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod)) - for name := range totalAvailableUsage { if name == v1.ResourcePods { nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) } else { - quantity := utils.GetResourceRequestQuantity(pod, name) - nodeInfo.usage[name].Sub(quantity) - totalAvailableUsage[name].Sub(quantity) + nodeInfo.usage[name].Sub(*podUsage[name]) + totalAvailableUsage[name].Sub(*podUsage[name]) } } diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 70c28e9e3..384418217 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -17,10 +17,14 @@ limitations under the License. package nodeutilization import ( + "context" + "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" - metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" + utilptr "k8s.io/utils/ptr" "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" @@ -32,6 +36,7 @@ type usageClient interface { nodes() []*v1.Node pods(node string) []*v1.Pod capture(nodes []*v1.Node) error + podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) } type requestedUsageClient struct { @@ -67,10 +72,18 @@ func (s *requestedUsageClient) pods(node string) []*v1.Pod { return s._pods[node] } +func (s *requestedUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + usage := make(map[v1.ResourceName]*resource.Quantity) + for _, resourceName := range s.resourceNames { + usage[resourceName] = utilptr.To[resource.Quantity](utils.GetResourceRequestQuantity(pod, resourceName).DeepCopy()) + } + return usage, nil +} + func (s *requestedUsageClient) capture(nodes []*v1.Node) error { s._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) s._pods = make(map[string][]*v1.Pod) - capturedNodes := []*v1.Node{} + capturedNodes := []*v1.Node{} for _, node := range nodes { pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil) @@ -90,10 +103,10 @@ func (s *requestedUsageClient) capture(nodes []*v1.Node) error { // store the snapshot of pods from the same (or the closest) node utilization computation s._pods[node.Name] = pods s._nodeUtilization[node.Name] = nodeUsage - capturedNodes = append(capturedNodes, node) + capturedNodes = append(capturedNodes, node) } - s._nodes = capturedNodes + s._nodes = capturedNodes return nil } @@ -102,7 +115,6 @@ type actualUsageClient struct { resourceNames []v1.ResourceName getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc metricsCollector *metricscollector.MetricsCollector - metricsClientset metricsclient.Interface _nodes []*v1.Node _pods map[string][]*v1.Pod @@ -115,13 +127,11 @@ func newActualUsageSnapshot( resourceNames []v1.ResourceName, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, metricsCollector *metricscollector.MetricsCollector, - metricsClientset metricsclient.Interface, ) *actualUsageClient { return &actualUsageClient{ resourceNames: resourceNames, getPodsAssignedToNode: getPodsAssignedToNode, metricsCollector: metricsCollector, - metricsClientset: metricsClientset, } } @@ -137,10 +147,35 @@ func (client *actualUsageClient) pods(node string) []*v1.Pod { return client._pods[node] } +func (client *actualUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + // It's not efficient to keep track of all pods in a cluster when only their fractions is evicted. + // Thus, take the current pod metrics without computing any softening (like e.g. EWMA). + podMetrics, err := client.metricsCollector.MetricsClient().MetricsV1beta1().PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("unable to get podmetrics for %q/%q: %v", pod.Namespace, pod.Name, err) + } + + totalUsage := make(map[v1.ResourceName]*resource.Quantity) + for _, container := range podMetrics.Containers { + for _, resourceName := range client.resourceNames { + if _, exists := container.Usage[resourceName]; !exists { + continue + } + if totalUsage[resourceName] == nil { + totalUsage[resourceName] = utilptr.To[resource.Quantity](container.Usage[resourceName].DeepCopy()) + } else { + totalUsage[resourceName].Add(container.Usage[resourceName]) + } + } + } + + return totalUsage, nil +} + func (client *actualUsageClient) capture(nodes []*v1.Node) error { client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) client._pods = make(map[string][]*v1.Pod) - capturedNodes := []*v1.Node{} + capturedNodes := []*v1.Node{} for _, node := range nodes { pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) @@ -149,18 +184,19 @@ func (client *actualUsageClient) capture(nodes []*v1.Node) error { continue } - nodeUsage, err := client.metricsCollector.NodeUsage(node) + nodeUsage, err := client.metricsCollector.NodeUsage(node) if err != nil { return err } + nodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI) // store the snapshot of pods from the same (or the closest) node utilization computation client._pods[node.Name] = pods client._nodeUtilization[node.Name] = nodeUsage - capturedNodes = append(capturedNodes, node) + capturedNodes = append(capturedNodes, node) } - client._nodes = capturedNodes + client._nodes = capturedNodes return nil } diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go index 41a3fcc82..a65ca69c4 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients_test.go +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -116,7 +116,6 @@ func TestActualUsageClient(t *testing.T) { resourceNames, podsAssignedToNode, collector, - metricsClientset, ) updateMetricsAndCheckNodeUtilization(t, ctx, diff --git a/vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_nodemetrics.go b/vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_nodemetrics.go new file mode 100644 index 000000000..a07bad65a --- /dev/null +++ b/vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_nodemetrics.go @@ -0,0 +1,77 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" + v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" +) + +// FakeNodeMetricses implements NodeMetricsInterface +type FakeNodeMetricses struct { + Fake *FakeMetricsV1beta1 +} + +var nodemetricsesResource = v1beta1.SchemeGroupVersion.WithResource("nodemetricses") + +var nodemetricsesKind = v1beta1.SchemeGroupVersion.WithKind("NodeMetrics") + +// Get takes name of the nodeMetrics, and returns the corresponding nodeMetrics object, and an error if there is any. +func (c *FakeNodeMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.NodeMetrics, err error) { + emptyResult := &v1beta1.NodeMetrics{} + obj, err := c.Fake. + Invokes(testing.NewRootGetActionWithOptions(nodemetricsesResource, name, options), emptyResult) + if obj == nil { + return emptyResult, err + } + return obj.(*v1beta1.NodeMetrics), err +} + +// List takes label and field selectors, and returns the list of NodeMetricses that match those selectors. +func (c *FakeNodeMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.NodeMetricsList, err error) { + emptyResult := &v1beta1.NodeMetricsList{} + obj, err := c.Fake. + Invokes(testing.NewRootListActionWithOptions(nodemetricsesResource, nodemetricsesKind, opts), emptyResult) + if obj == nil { + return emptyResult, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1beta1.NodeMetricsList{ListMeta: obj.(*v1beta1.NodeMetricsList).ListMeta} + for _, item := range obj.(*v1beta1.NodeMetricsList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested nodeMetricses. +func (c *FakeNodeMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchActionWithOptions(nodemetricsesResource, opts)) +} diff --git a/vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_podmetrics.go b/vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_podmetrics.go new file mode 100644 index 000000000..322b98f3e --- /dev/null +++ b/vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_podmetrics.go @@ -0,0 +1,81 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" + v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" +) + +// FakePodMetricses implements PodMetricsInterface +type FakePodMetricses struct { + Fake *FakeMetricsV1beta1 + ns string +} + +var podmetricsesResource = v1beta1.SchemeGroupVersion.WithResource("podmetricses") + +var podmetricsesKind = v1beta1.SchemeGroupVersion.WithKind("PodMetrics") + +// Get takes name of the podMetrics, and returns the corresponding podMetrics object, and an error if there is any. +func (c *FakePodMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.PodMetrics, err error) { + emptyResult := &v1beta1.PodMetrics{} + obj, err := c.Fake. + Invokes(testing.NewGetActionWithOptions(podmetricsesResource, c.ns, name, options), emptyResult) + + if obj == nil { + return emptyResult, err + } + return obj.(*v1beta1.PodMetrics), err +} + +// List takes label and field selectors, and returns the list of PodMetricses that match those selectors. +func (c *FakePodMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.PodMetricsList, err error) { + emptyResult := &v1beta1.PodMetricsList{} + obj, err := c.Fake. + Invokes(testing.NewListActionWithOptions(podmetricsesResource, podmetricsesKind, c.ns, opts), emptyResult) + + if obj == nil { + return emptyResult, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1beta1.PodMetricsList{ListMeta: obj.(*v1beta1.PodMetricsList).ListMeta} + for _, item := range obj.(*v1beta1.PodMetricsList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested podMetricses. +func (c *FakePodMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchActionWithOptions(podmetricsesResource, c.ns, opts)) + +}