mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 05:14:13 +01:00
Get pod usage from the usage client
This commit is contained in:
@@ -79,6 +79,10 @@ func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resou
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mc *MetricsCollector) MetricsClient() metricsclient.Interface {
|
||||
return mc.metricsClientset
|
||||
}
|
||||
|
||||
func (mc *MetricsCollector) Collect(ctx context.Context) error {
|
||||
mc.mu.Lock()
|
||||
defer mc.mu.Unlock()
|
||||
|
||||
@@ -18,6 +18,7 @@ package nodeutilization
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
@@ -219,6 +220,7 @@ func evictPodsFromSourceNodes(
|
||||
podFilter func(pod *v1.Pod) bool,
|
||||
resourceNames []v1.ResourceName,
|
||||
continueEviction continueEvictionCond,
|
||||
usageSnapshot usageClient,
|
||||
) {
|
||||
// upper bound on total number of pods/cpu/memory and optional extended resources to be moved
|
||||
totalAvailableUsage := map[v1.ResourceName]*resource.Quantity{
|
||||
@@ -267,7 +269,7 @@ func evictPodsFromSourceNodes(
|
||||
klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers")
|
||||
// sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers.
|
||||
podutil.SortPodsBasedOnPriorityLowToHigh(removablePods)
|
||||
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction)
|
||||
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction, usageSnapshot)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case *evictions.EvictionTotalLimitError:
|
||||
@@ -288,6 +290,7 @@ func evictPods(
|
||||
podEvictor frameworktypes.Evictor,
|
||||
evictOptions evictions.EvictOptions,
|
||||
continueEviction continueEvictionCond,
|
||||
usageSnapshot usageClient,
|
||||
) error {
|
||||
var excludedNamespaces sets.Set[string]
|
||||
if evictableNamespaces != nil {
|
||||
@@ -296,6 +299,7 @@ func evictPods(
|
||||
|
||||
if continueEviction(nodeInfo, totalAvailableUsage) {
|
||||
for _, pod := range inputPods {
|
||||
fmt.Printf("pods: %v\n", pod.Name)
|
||||
if !utils.PodToleratesTaints(pod, taintsOfLowNodes) {
|
||||
klog.V(3).InfoS("Skipping eviction for pod, doesn't tolerate node taint", "pod", klog.KObj(pod))
|
||||
continue
|
||||
@@ -313,18 +317,21 @@ func evictPods(
|
||||
if !preEvictionFilterWithOptions(pod) {
|
||||
continue
|
||||
}
|
||||
podUsage, err := usageSnapshot.podUsage(pod)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
|
||||
continue
|
||||
}
|
||||
err = podEvictor.Evict(ctx, pod, evictOptions)
|
||||
if err == nil {
|
||||
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))
|
||||
|
||||
for name := range totalAvailableUsage {
|
||||
if name == v1.ResourcePods {
|
||||
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
|
||||
totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
|
||||
} else {
|
||||
quantity := utils.GetResourceRequestQuantity(pod, name)
|
||||
nodeInfo.usage[name].Sub(quantity)
|
||||
totalAvailableUsage[name].Sub(quantity)
|
||||
nodeInfo.usage[name].Sub(*podUsage[name])
|
||||
totalAvailableUsage[name].Sub(*podUsage[name])
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,10 +17,14 @@ limitations under the License.
|
||||
package nodeutilization
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
|
||||
utilptr "k8s.io/utils/ptr"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
|
||||
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
|
||||
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
|
||||
@@ -32,6 +36,7 @@ type usageClient interface {
|
||||
nodes() []*v1.Node
|
||||
pods(node string) []*v1.Pod
|
||||
capture(nodes []*v1.Node) error
|
||||
podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error)
|
||||
}
|
||||
|
||||
type requestedUsageClient struct {
|
||||
@@ -67,10 +72,18 @@ func (s *requestedUsageClient) pods(node string) []*v1.Pod {
|
||||
return s._pods[node]
|
||||
}
|
||||
|
||||
func (s *requestedUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
|
||||
usage := make(map[v1.ResourceName]*resource.Quantity)
|
||||
for _, resourceName := range s.resourceNames {
|
||||
usage[resourceName] = utilptr.To[resource.Quantity](utils.GetResourceRequestQuantity(pod, resourceName).DeepCopy())
|
||||
}
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
func (s *requestedUsageClient) capture(nodes []*v1.Node) error {
|
||||
s._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
|
||||
s._pods = make(map[string][]*v1.Pod)
|
||||
capturedNodes := []*v1.Node{}
|
||||
capturedNodes := []*v1.Node{}
|
||||
|
||||
for _, node := range nodes {
|
||||
pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil)
|
||||
@@ -90,10 +103,10 @@ func (s *requestedUsageClient) capture(nodes []*v1.Node) error {
|
||||
// store the snapshot of pods from the same (or the closest) node utilization computation
|
||||
s._pods[node.Name] = pods
|
||||
s._nodeUtilization[node.Name] = nodeUsage
|
||||
capturedNodes = append(capturedNodes, node)
|
||||
capturedNodes = append(capturedNodes, node)
|
||||
}
|
||||
|
||||
s._nodes = capturedNodes
|
||||
s._nodes = capturedNodes
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -102,7 +115,6 @@ type actualUsageClient struct {
|
||||
resourceNames []v1.ResourceName
|
||||
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
|
||||
metricsCollector *metricscollector.MetricsCollector
|
||||
metricsClientset metricsclient.Interface
|
||||
|
||||
_nodes []*v1.Node
|
||||
_pods map[string][]*v1.Pod
|
||||
@@ -115,13 +127,11 @@ func newActualUsageSnapshot(
|
||||
resourceNames []v1.ResourceName,
|
||||
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
|
||||
metricsCollector *metricscollector.MetricsCollector,
|
||||
metricsClientset metricsclient.Interface,
|
||||
) *actualUsageClient {
|
||||
return &actualUsageClient{
|
||||
resourceNames: resourceNames,
|
||||
getPodsAssignedToNode: getPodsAssignedToNode,
|
||||
metricsCollector: metricsCollector,
|
||||
metricsClientset: metricsClientset,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,10 +147,35 @@ func (client *actualUsageClient) pods(node string) []*v1.Pod {
|
||||
return client._pods[node]
|
||||
}
|
||||
|
||||
func (client *actualUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
|
||||
// It's not efficient to keep track of all pods in a cluster when only their fractions is evicted.
|
||||
// Thus, take the current pod metrics without computing any softening (like e.g. EWMA).
|
||||
podMetrics, err := client.metricsCollector.MetricsClient().MetricsV1beta1().PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get podmetrics for %q/%q: %v", pod.Namespace, pod.Name, err)
|
||||
}
|
||||
|
||||
totalUsage := make(map[v1.ResourceName]*resource.Quantity)
|
||||
for _, container := range podMetrics.Containers {
|
||||
for _, resourceName := range client.resourceNames {
|
||||
if _, exists := container.Usage[resourceName]; !exists {
|
||||
continue
|
||||
}
|
||||
if totalUsage[resourceName] == nil {
|
||||
totalUsage[resourceName] = utilptr.To[resource.Quantity](container.Usage[resourceName].DeepCopy())
|
||||
} else {
|
||||
totalUsage[resourceName].Add(container.Usage[resourceName])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return totalUsage, nil
|
||||
}
|
||||
|
||||
func (client *actualUsageClient) capture(nodes []*v1.Node) error {
|
||||
client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
|
||||
client._pods = make(map[string][]*v1.Pod)
|
||||
capturedNodes := []*v1.Node{}
|
||||
capturedNodes := []*v1.Node{}
|
||||
|
||||
for _, node := range nodes {
|
||||
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
|
||||
@@ -149,18 +184,19 @@ func (client *actualUsageClient) capture(nodes []*v1.Node) error {
|
||||
continue
|
||||
}
|
||||
|
||||
nodeUsage, err := client.metricsCollector.NodeUsage(node)
|
||||
nodeUsage, err := client.metricsCollector.NodeUsage(node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI)
|
||||
|
||||
// store the snapshot of pods from the same (or the closest) node utilization computation
|
||||
client._pods[node.Name] = pods
|
||||
client._nodeUtilization[node.Name] = nodeUsage
|
||||
capturedNodes = append(capturedNodes, node)
|
||||
capturedNodes = append(capturedNodes, node)
|
||||
}
|
||||
|
||||
client._nodes = capturedNodes
|
||||
client._nodes = capturedNodes
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -116,7 +116,6 @@ func TestActualUsageClient(t *testing.T) {
|
||||
resourceNames,
|
||||
podsAssignedToNode,
|
||||
collector,
|
||||
metricsClientset,
|
||||
)
|
||||
|
||||
updateMetricsAndCheckNodeUtilization(t, ctx,
|
||||
|
||||
77
vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_nodemetrics.go
generated
vendored
Normal file
77
vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_nodemetrics.go
generated
vendored
Normal file
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Code generated by client-gen. DO NOT EDIT.
|
||||
|
||||
package fake
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
labels "k8s.io/apimachinery/pkg/labels"
|
||||
watch "k8s.io/apimachinery/pkg/watch"
|
||||
testing "k8s.io/client-go/testing"
|
||||
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
||||
)
|
||||
|
||||
// FakeNodeMetricses implements NodeMetricsInterface
|
||||
type FakeNodeMetricses struct {
|
||||
Fake *FakeMetricsV1beta1
|
||||
}
|
||||
|
||||
var nodemetricsesResource = v1beta1.SchemeGroupVersion.WithResource("nodemetricses")
|
||||
|
||||
var nodemetricsesKind = v1beta1.SchemeGroupVersion.WithKind("NodeMetrics")
|
||||
|
||||
// Get takes name of the nodeMetrics, and returns the corresponding nodeMetrics object, and an error if there is any.
|
||||
func (c *FakeNodeMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.NodeMetrics, err error) {
|
||||
emptyResult := &v1beta1.NodeMetrics{}
|
||||
obj, err := c.Fake.
|
||||
Invokes(testing.NewRootGetActionWithOptions(nodemetricsesResource, name, options), emptyResult)
|
||||
if obj == nil {
|
||||
return emptyResult, err
|
||||
}
|
||||
return obj.(*v1beta1.NodeMetrics), err
|
||||
}
|
||||
|
||||
// List takes label and field selectors, and returns the list of NodeMetricses that match those selectors.
|
||||
func (c *FakeNodeMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.NodeMetricsList, err error) {
|
||||
emptyResult := &v1beta1.NodeMetricsList{}
|
||||
obj, err := c.Fake.
|
||||
Invokes(testing.NewRootListActionWithOptions(nodemetricsesResource, nodemetricsesKind, opts), emptyResult)
|
||||
if obj == nil {
|
||||
return emptyResult, err
|
||||
}
|
||||
|
||||
label, _, _ := testing.ExtractFromListOptions(opts)
|
||||
if label == nil {
|
||||
label = labels.Everything()
|
||||
}
|
||||
list := &v1beta1.NodeMetricsList{ListMeta: obj.(*v1beta1.NodeMetricsList).ListMeta}
|
||||
for _, item := range obj.(*v1beta1.NodeMetricsList).Items {
|
||||
if label.Matches(labels.Set(item.Labels)) {
|
||||
list.Items = append(list.Items, item)
|
||||
}
|
||||
}
|
||||
return list, err
|
||||
}
|
||||
|
||||
// Watch returns a watch.Interface that watches the requested nodeMetricses.
|
||||
func (c *FakeNodeMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
|
||||
return c.Fake.
|
||||
InvokesWatch(testing.NewRootWatchActionWithOptions(nodemetricsesResource, opts))
|
||||
}
|
||||
81
vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_podmetrics.go
generated
vendored
Normal file
81
vendor/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1/fake/fake_podmetrics.go
generated
vendored
Normal file
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Code generated by client-gen. DO NOT EDIT.
|
||||
|
||||
package fake
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
labels "k8s.io/apimachinery/pkg/labels"
|
||||
watch "k8s.io/apimachinery/pkg/watch"
|
||||
testing "k8s.io/client-go/testing"
|
||||
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
||||
)
|
||||
|
||||
// FakePodMetricses implements PodMetricsInterface
|
||||
type FakePodMetricses struct {
|
||||
Fake *FakeMetricsV1beta1
|
||||
ns string
|
||||
}
|
||||
|
||||
var podmetricsesResource = v1beta1.SchemeGroupVersion.WithResource("podmetricses")
|
||||
|
||||
var podmetricsesKind = v1beta1.SchemeGroupVersion.WithKind("PodMetrics")
|
||||
|
||||
// Get takes name of the podMetrics, and returns the corresponding podMetrics object, and an error if there is any.
|
||||
func (c *FakePodMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.PodMetrics, err error) {
|
||||
emptyResult := &v1beta1.PodMetrics{}
|
||||
obj, err := c.Fake.
|
||||
Invokes(testing.NewGetActionWithOptions(podmetricsesResource, c.ns, name, options), emptyResult)
|
||||
|
||||
if obj == nil {
|
||||
return emptyResult, err
|
||||
}
|
||||
return obj.(*v1beta1.PodMetrics), err
|
||||
}
|
||||
|
||||
// List takes label and field selectors, and returns the list of PodMetricses that match those selectors.
|
||||
func (c *FakePodMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.PodMetricsList, err error) {
|
||||
emptyResult := &v1beta1.PodMetricsList{}
|
||||
obj, err := c.Fake.
|
||||
Invokes(testing.NewListActionWithOptions(podmetricsesResource, podmetricsesKind, c.ns, opts), emptyResult)
|
||||
|
||||
if obj == nil {
|
||||
return emptyResult, err
|
||||
}
|
||||
|
||||
label, _, _ := testing.ExtractFromListOptions(opts)
|
||||
if label == nil {
|
||||
label = labels.Everything()
|
||||
}
|
||||
list := &v1beta1.PodMetricsList{ListMeta: obj.(*v1beta1.PodMetricsList).ListMeta}
|
||||
for _, item := range obj.(*v1beta1.PodMetricsList).Items {
|
||||
if label.Matches(labels.Set(item.Labels)) {
|
||||
list.Items = append(list.Items, item)
|
||||
}
|
||||
}
|
||||
return list, err
|
||||
}
|
||||
|
||||
// Watch returns a watch.Interface that watches the requested podMetricses.
|
||||
func (c *FakePodMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
|
||||
return c.Fake.
|
||||
InvokesWatch(testing.NewWatchActionWithOptions(podmetricsesResource, c.ns, opts))
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user