1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 13:29:11 +01:00

Compare commits

...

8 Commits

Author SHA1 Message Date
Jan Chaloupka
b56794708d descheduler: wire the metrics collector with the framework handle 2024-11-05 21:13:27 +01:00
Jan Chaloupka
b7b352780e LowNodeUtilization: test metrics based utilization 2024-11-05 21:11:33 +01:00
Jan Chaloupka
646a383b37 Get pod usage from the usage client 2024-11-05 14:07:59 +01:00
Jan Chaloupka
ad18f41b66 Update actualUsageClient 2024-11-04 18:11:27 +01:00
Jan Chaloupka
80f9c0ada6 Separate usage client into a new file 2024-10-21 22:38:25 +02:00
Jan Chaloupka
3174107718 usageSnapshot -> requestedUsageClient 2024-10-21 22:22:38 +02:00
Jan Chaloupka
1f55c4d680 node utilization: abstract pod utilization retriever 2024-10-15 12:18:37 +02:00
Jan Chaloupka
dc9bea3ede nodeutiliation: create a usage snapshot 2024-10-15 12:18:30 +02:00
23 changed files with 1519 additions and 148 deletions

View File

@@ -26,6 +26,8 @@ import (
clientset "k8s.io/client-go/kubernetes"
componentbaseconfig "k8s.io/component-base/config"
componentbaseoptions "k8s.io/component-base/config/options"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
"sigs.k8s.io/descheduler/pkg/apis/componentconfig"
"sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1"
deschedulerscheme "sigs.k8s.io/descheduler/pkg/descheduler/scheme"
@@ -42,6 +44,7 @@ type DeschedulerServer struct {
Client clientset.Interface
EventClient clientset.Interface
MetricsClient metricsclient.Interface
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
DisableMetrics bool
EnableHTTP2 bool

View File

@@ -41,6 +41,9 @@ type DeschedulerPolicy struct {
// MaxNoOfPodsToTotal restricts maximum of pods to be evicted total.
MaxNoOfPodsToEvictTotal *uint
// MetricsCollector configures collection of metrics about actual resource utilization
MetricsCollector MetricsCollector
}
// Namespaces carries a list of included/excluded namespaces
@@ -84,3 +87,10 @@ type PluginSet struct {
Enabled []string
Disabled []string
}
// MetricsCollector configures collection of metrics about actual resource utilization
type MetricsCollector struct {
// Enabled metrics collection from kubernetes metrics.
// Later, the collection can be extended to other providers.
Enabled bool
}

View File

@@ -40,6 +40,9 @@ type DeschedulerPolicy struct {
// MaxNoOfPodsToTotal restricts maximum of pods to be evicted total.
MaxNoOfPodsToEvictTotal *uint `json:"maxNoOfPodsToEvictTotal,omitempty"`
// MetricsCollector configures collection of metrics about actual resource utilization
MetricsCollector MetricsCollector `json:"metricsCollector,omitempty"`
}
type DeschedulerProfile struct {
@@ -66,3 +69,10 @@ type PluginSet struct {
Enabled []string `json:"enabled"`
Disabled []string `json:"disabled"`
}
// MetricsCollector configures collection of metrics about actual resource utilization
type MetricsCollector struct {
// Enabled metrics collection from kubernetes metrics.
// Later, the collection can be extended to other providers.
Enabled bool
}

View File

@@ -21,6 +21,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
componentbaseconfig "k8s.io/component-base/config"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
// Ensure to load all auth plugins.
_ "k8s.io/client-go/plugin/pkg/client/auth"
@@ -28,7 +29,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
)
func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) {
func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) {
var cfg *rest.Config
if len(clientConnection.Kubeconfig) != 0 {
master, err := GetMasterFromKubeconfig(clientConnection.Kubeconfig)
@@ -56,9 +57,28 @@ func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfigura
cfg = rest.AddUserAgent(cfg, userAgt)
}
return cfg, nil
}
func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) {
cfg, err := createConfig(clientConnection, userAgt)
if err != nil {
return nil, fmt.Errorf("unable to create config: %v", err)
}
return clientset.NewForConfig(cfg)
}
func CreateMetricsClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (metricsclient.Interface, error) {
cfg, err := createConfig(clientConnection, userAgt)
if err != nil {
return nil, fmt.Errorf("unable to create config: %v", err)
}
// Create the metrics clientset to access the metrics.k8s.io API
return metricsclient.NewForConfig(cfg)
}
func GetMasterFromKubeconfig(filename string) (string, error) {
config, err := clientcmd.LoadFromFile(filename)
if err != nil {

View File

@@ -23,43 +23,41 @@ import (
"strconv"
"time"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/metrics"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
)
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
@@ -78,6 +76,7 @@ type descheduler struct {
eventRecorder events.EventRecorder
podEvictor *evictions.PodEvictor
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
metricsCollector *metricscollector.MetricsCollector
}
type informerResources struct {
@@ -152,6 +151,11 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche
WithMetricsEnabled(!rs.DisableMetrics),
)
var metricsCollector *metricscollector.MetricsCollector
if deschedulerPolicy.MetricsCollector.Enabled {
metricsCollector = metricscollector.NewMetricsCollector(rs.Client, rs.MetricsClient)
}
return &descheduler{
rs: rs,
ir: ir,
@@ -161,6 +165,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche
eventRecorder: eventRecorder,
podEvictor: podEvictor,
podEvictionReactionFnc: podEvictionReactionFnc,
metricsCollector: metricsCollector,
}, nil
}
@@ -240,6 +245,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(d.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
frameworkprofile.WithMetricsCollector(d.metricsCollector),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
@@ -304,6 +310,14 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
return err
}
if deschedulerPolicy.MetricsCollector.Enabled {
metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler")
if err != nil {
return err
}
rs.MetricsClient = metricsClient
}
runFn := func() error {
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion)
}
@@ -411,6 +425,12 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
go func() {
klog.V(2).Infof("Starting metrics collector")
descheduler.metricsCollector.Run(ctx)
klog.V(2).Infof("Stopped metrics collector")
}()
wait.NonSlidingUntil(func() {
// A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")

View File

@@ -16,11 +16,16 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
utilptr "k8s.io/utils/ptr"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
"sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints"
"sigs.k8s.io/descheduler/pkg/utils"
@@ -33,6 +38,7 @@ func initPluginRegistry() {
pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(defaultevictor.PluginName, defaultevictor.New, &defaultevictor.DefaultEvictor{}, &defaultevictor.DefaultEvictorArgs{}, defaultevictor.ValidateDefaultEvictorArgs, defaultevictor.SetDefaults_DefaultEvictorArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(removepodsviolatingnodetaints.PluginName, removepodsviolatingnodetaints.New, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaints{}, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaintsArgs{}, removepodsviolatingnodetaints.ValidateRemovePodsViolatingNodeTaintsArgs, removepodsviolatingnodetaints.SetDefaults_RemovePodsViolatingNodeTaintsArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(nodeutilization.LowNodeUtilizationPluginName, nodeutilization.NewLowNodeUtilization, &nodeutilization.LowNodeUtilization{}, &nodeutilization.LowNodeUtilizationArgs{}, nodeutilization.ValidateLowNodeUtilizationArgs, nodeutilization.SetDefaults_LowNodeUtilizationArgs, pluginregistry.PluginRegistry)
}
func removePodsViolatingNodeTaintsPolicy() *api.DeschedulerPolicy {
@@ -99,6 +105,44 @@ func removeDuplicatesPolicy() *api.DeschedulerPolicy {
}
}
func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThresholds, metricsEnabled bool) *api.DeschedulerPolicy {
return &api.DeschedulerPolicy{
Profiles: []api.DeschedulerProfile{
{
Name: "Profile",
PluginConfigs: []api.PluginConfig{
{
Name: nodeutilization.LowNodeUtilizationPluginName,
Args: &nodeutilization.LowNodeUtilizationArgs{
Thresholds: thresholds,
TargetThresholds: targetThresholds,
MetricsUtilization: nodeutilization.MetricsUtilization{
MetricsServer: metricsEnabled,
},
},
},
{
Name: defaultevictor.PluginName,
Args: &defaultevictor.DefaultEvictorArgs{},
},
},
Plugins: api.Plugins{
Filter: api.PluginSet{
Enabled: []string{
defaultevictor.PluginName,
},
},
Balance: api.PluginSet{
Enabled: []string{
nodeutilization.LowNodeUtilizationPluginName,
},
},
},
},
},
}
}
func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
client := fakeclientset.NewSimpleClientset(objects...)
eventClient := fakeclientset.NewSimpleClientset(objects...)
@@ -539,3 +583,76 @@ func TestDeschedulingLimits(t *testing.T) {
})
}
}
func TestLoadAwareDescheduling(t *testing.T) {
initPluginRegistry()
ownerRef1 := test.GetReplicaSetOwnerRefList()
updatePod := func(pod *v1.Pod) {
pod.ObjectMeta.OwnerReferences = ownerRef1
}
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
nodes := []*v1.Node{node1, node2}
p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod)
p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod)
p3 := test.BuildTestPod("p3", 300, 0, node1.Name, updatePod)
p4 := test.BuildTestPod("p4", 300, 0, node1.Name, updatePod)
p5 := test.BuildTestPod("p5", 300, 0, node1.Name, updatePod)
ctxCancel, cancel := context.WithCancel(ctx)
_, descheduler, client := initDescheduler(
t,
ctxCancel,
lowNodeUtilizationPolicy(
api.ResourceThresholds{
v1.ResourceCPU: 30,
v1.ResourcePods: 30,
},
api.ResourceThresholds{
v1.ResourceCPU: 50,
v1.ResourcePods: 50,
},
true, // enabled metrics utilization
),
node1, node2, p1, p2, p3, p4, p5)
defer cancel()
nodemetricses := []*v1beta1.NodeMetrics{
test.BuildNodeMetrics("n1", 2400, 3000),
test.BuildNodeMetrics("n2", 400, 0),
}
podmetricses := []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 400, 0),
test.BuildPodMetrics("p2", 400, 0),
test.BuildPodMetrics("p3", 400, 0),
test.BuildPodMetrics("p4", 400, 0),
test.BuildPodMetrics("p5", 400, 0),
}
var metricsObjs []runtime.Object
for _, nodemetrics := range nodemetricses {
metricsObjs = append(metricsObjs, nodemetrics)
}
for _, podmetrics := range podmetricses {
metricsObjs = append(metricsObjs, podmetrics)
}
metricsClientset := fakemetricsclient.NewSimpleClientset(metricsObjs...)
descheduler.metricsCollector = metricscollector.NewMetricsCollector(client, metricsClientset)
descheduler.metricsCollector.Collect(ctx)
err := descheduler.runDeschedulerLoop(ctx, nodes)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
totalEs := descheduler.podEvictor.TotalEvicted()
if totalEs != 2 {
t.Fatalf("Expected %v evictions in total, got %v instead", 2, totalEs)
}
t.Logf("Total evictions: %v", totalEs)
}

View File

@@ -0,0 +1,125 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricscollector
import (
"context"
"fmt"
"math"
"sync"
"time"
"k8s.io/klog/v2"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
utilptr "k8s.io/utils/ptr"
)
const (
beta float64 = 0.9
)
type MetricsCollector struct {
clientset kubernetes.Interface
metricsClientset metricsclient.Interface
nodes map[string]map[v1.ResourceName]*resource.Quantity
mu sync.Mutex
}
func NewMetricsCollector(clientset kubernetes.Interface, metricsClientset metricsclient.Interface) *MetricsCollector {
return &MetricsCollector{
clientset: clientset,
metricsClientset: metricsClientset,
nodes: make(map[string]map[v1.ResourceName]*resource.Quantity),
}
}
func (mc *MetricsCollector) Run(ctx context.Context) {
wait.NonSlidingUntil(func() {
mc.Collect(ctx)
}, 5*time.Second, ctx.Done())
}
func weightedAverage(prevValue, value int64) int64 {
return int64(math.Floor(beta*float64(prevValue) + (1-beta)*float64(value)))
}
func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resource.Quantity, error) {
mc.mu.Lock()
defer mc.mu.Unlock()
if _, exists := mc.nodes[node.Name]; !exists {
klog.V(4).Infof("unable to find node %q in the collected metrics", node.Name)
return nil, fmt.Errorf("unable to find node %q in the collected metrics", node.Name)
}
return map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceCPU].DeepCopy()),
v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceMemory].DeepCopy()),
}, nil
}
func (mc *MetricsCollector) MetricsClient() metricsclient.Interface {
return mc.metricsClientset
}
func (mc *MetricsCollector) Collect(ctx context.Context) error {
mc.mu.Lock()
defer mc.mu.Unlock()
nodes, err := mc.clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("unable to list nodes: %v", err)
}
for _, node := range nodes.Items {
metrics, err := mc.metricsClientset.MetricsV1beta1().NodeMetricses().Get(context.TODO(), node.Name, metav1.GetOptions{})
if err != nil {
fmt.Printf("Error fetching metrics for node %s: %v\n", node.Name, err)
// No entry -> duplicate the previous value -> do nothing as beta*PV + (1-beta)*PV = PV
continue
}
if _, exists := mc.nodes[node.Name]; !exists {
mc.nodes[node.Name] = map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: utilptr.To[resource.Quantity](metrics.Usage.Cpu().DeepCopy()),
v1.ResourceMemory: utilptr.To[resource.Quantity](metrics.Usage.Memory().DeepCopy()),
}
} else {
// get MilliValue to reduce loss of precision
mc.nodes[node.Name][v1.ResourceCPU].SetMilli(
weightedAverage(mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), metrics.Usage.Cpu().MilliValue()),
)
mc.nodes[node.Name][v1.ResourceMemory].SetMilli(
weightedAverage(mc.nodes[node.Name][v1.ResourceMemory].MilliValue(), metrics.Usage.Memory().MilliValue()),
)
}
// Display CPU and memory usage
// fmt.Printf("%s: %vm, %vMi\n", node.Name, metrics.Usage.Cpu().MilliValue(), metrics.Usage.Memory().Value()/(1024*1024))
// fmt.Printf("%s: %vm, %vMi\n", node.Name, mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), mc.nodes[node.Name][v1.ResourceMemory].Value()/(1024*1024))
}
fmt.Printf("--\n")
return nil
}

View File

@@ -0,0 +1,103 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricscollector
import (
"context"
"os"
"testing"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
fakeclientset "k8s.io/client-go/kubernetes/fake"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/descheduler/test"
)
func TestMetricsCollector1(t *testing.T) {
kubeconfig := os.Getenv("KUBECONFIG")
// Use the kubeconfig to build the Kubernetes client
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
// Create the standard Kubernetes clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// Create the metrics clientset to access the metrics.k8s.io API
metricsClientset, err := metricsclient.NewForConfig(config)
if err != nil {
panic(err.Error())
}
collector := NewMetricsCollector(clientset, metricsClientset)
collector.Run(context.TODO())
// collector.Collect(context.TODO())
}
func checkCpuNodeUsage(t *testing.T, usage map[v1.ResourceName]*resource.Quantity, millicpu int64) {
t.Logf("current node cpu usage: %v\n", usage[v1.ResourceCPU].MilliValue())
if usage[v1.ResourceCPU].MilliValue() != millicpu {
t.Fatalf("cpu node usage expected to be %v, got %v instead", millicpu, usage[v1.ResourceCPU].MilliValue())
}
}
func TestMetricsCollector2(t *testing.T) {
gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodemetricses"}
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)
n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3)
metricsClientset := fakemetricsclient.NewSimpleClientset(n1metrics, n2metrics, n3metrics)
t.Logf("Set initial node cpu usage to 1400")
collector := NewMetricsCollector(clientset, metricsClientset)
collector.Collect(context.TODO())
nodesUsage, _ := collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1400)
t.Logf("Set current node cpu usage to 500")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(500, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1310)
t.Logf("Set current node cpu usage to 500")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1268)
}

View File

@@ -218,7 +218,12 @@ func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nod
resourceNames = append(resourceNames, name)
}
availableResources, err := nodeAvailableResources(nodeIndexer, node, resourceNames)
availableResources, err := nodeAvailableResources(nodeIndexer, node, resourceNames,
func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
},
)
if err != nil {
return false, err
}
@@ -239,12 +244,15 @@ func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nod
}
// nodeAvailableResources returns resources mapped to the quanitity available on the node.
func nodeAvailableResources(nodeIndexer podutil.GetPodsAssignedToNodeFunc, node *v1.Node, resourceNames []v1.ResourceName) (map[v1.ResourceName]*resource.Quantity, error) {
func nodeAvailableResources(nodeIndexer podutil.GetPodsAssignedToNodeFunc, node *v1.Node, resourceNames []v1.ResourceName, podUtilization podutil.PodUtilizationFnc) (map[v1.ResourceName]*resource.Quantity, error) {
podsOnNode, err := podutil.ListPodsOnANode(node.Name, nodeIndexer, nil)
if err != nil {
return nil, err
}
nodeUtilization := NodeUtilization(podsOnNode, resourceNames)
nodeUtilization, err := NodeUtilization(podsOnNode, resourceNames, podUtilization)
if err != nil {
return nil, err
}
remainingResources := map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: resource.NewMilliQuantity(node.Status.Allocatable.Cpu().MilliValue()-nodeUtilization[v1.ResourceCPU].MilliValue(), resource.DecimalSI),
v1.ResourceMemory: resource.NewQuantity(node.Status.Allocatable.Memory().Value()-nodeUtilization[v1.ResourceMemory].Value(), resource.BinarySI),
@@ -265,31 +273,34 @@ func nodeAvailableResources(nodeIndexer podutil.GetPodsAssignedToNodeFunc, node
}
// NodeUtilization returns the resources requested by the given pods. Only resources supplied in the resourceNames parameter are calculated.
func NodeUtilization(pods []*v1.Pod, resourceNames []v1.ResourceName) map[v1.ResourceName]*resource.Quantity {
totalReqs := map[v1.ResourceName]*resource.Quantity{
func NodeUtilization(pods []*v1.Pod, resourceNames []v1.ResourceName, podUtilization podutil.PodUtilizationFnc) (map[v1.ResourceName]*resource.Quantity, error) {
totalUtilization := map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: resource.NewMilliQuantity(0, resource.DecimalSI),
v1.ResourceMemory: resource.NewQuantity(0, resource.BinarySI),
v1.ResourcePods: resource.NewQuantity(int64(len(pods)), resource.DecimalSI),
}
for _, name := range resourceNames {
if !IsBasicResource(name) {
totalReqs[name] = resource.NewQuantity(0, resource.DecimalSI)
totalUtilization[name] = resource.NewQuantity(0, resource.DecimalSI)
}
}
for _, pod := range pods {
req, _ := utils.PodRequestsAndLimits(pod)
podUtil, err := podUtilization(pod)
if err != nil {
return nil, err
}
for _, name := range resourceNames {
quantity, ok := req[name]
quantity, ok := podUtil[name]
if ok && name != v1.ResourcePods {
// As Quantity.Add says: Add adds the provided y quantity to the current value. If the current value is zero,
// the format of the quantity will be updated to the format of y.
totalReqs[name].Add(quantity)
totalUtilization[name].Add(quantity)
}
}
}
return totalReqs
return totalUtilization, nil
}
// IsBasicResource checks if resource is basic native.

View File

@@ -39,6 +39,9 @@ type FilterFunc func(*v1.Pod) bool
// as input and returns the pods that assigned to the node.
type GetPodsAssignedToNodeFunc func(string, FilterFunc) ([]*v1.Pod, error)
// PodUtilizationFnc is a function for getting pod's utilization. E.g. requested resources of utilization from metrics.
type PodUtilizationFnc func(pod *v1.Pod) (v1.ResourceList, error)
// WrapFilterFuncs wraps a set of FilterFunc in one.
func WrapFilterFuncs(filters ...FilterFunc) FilterFunc {
return func(pod *v1.Pod) bool {

View File

@@ -8,6 +8,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
)
@@ -18,6 +19,7 @@ type HandleImpl struct {
SharedInformerFactoryImpl informers.SharedInformerFactory
EvictorFilterImpl frameworktypes.EvictorPlugin
PodEvictorImpl *evictions.PodEvictor
MetricsCollectorImpl *metricscollector.MetricsCollector
}
var _ frameworktypes.Handle = &HandleImpl{}
@@ -26,6 +28,10 @@ func (hi *HandleImpl) ClientSet() clientset.Interface {
return hi.ClientsetImpl
}
func (hi *HandleImpl) MetricsCollector() *metricscollector.MetricsCollector {
return hi.MetricsCollectorImpl
}
func (hi *HandleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc {
return hi.GetPodsAssignedToNodeFuncImpl
}

View File

@@ -44,6 +44,7 @@ type HighNodeUtilization struct {
underutilizationCriteria []interface{}
resourceNames []v1.ResourceName
targetThresholds api.ResourceThresholds
usageSnapshot usageClient
}
var _ frameworktypes.BalancePlugin = &HighNodeUtilization{}
@@ -84,6 +85,7 @@ func NewHighNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (
targetThresholds: targetThresholds,
underutilizationCriteria: underutilizationCriteria,
podFilter: podFilter,
usageSnapshot: newRequestedUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc()),
}, nil
}
@@ -94,9 +96,15 @@ func (h *HighNodeUtilization) Name() string {
// Balance extension point implementation for the plugin
func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
if err := h.usageSnapshot.capture(nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
}
sourceNodes, highNodes := classifyNodes(
getNodeUsage(nodes, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc()),
getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc(), false),
getNodeUsage(nodes, h.usageSnapshot),
getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, false, h.usageSnapshot),
func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool {
return isNodeWithLowUtilization(usage, threshold.lowResourceThreshold)
},
@@ -152,7 +160,9 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
evictions.EvictOptions{StrategyName: HighNodeUtilizationPluginName},
h.podFilter,
h.resourceNames,
continueEvictionCond)
continueEvictionCond,
h.usageSnapshot,
)
return nil
}

View File

@@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
@@ -43,6 +44,7 @@ type LowNodeUtilization struct {
underutilizationCriteria []interface{}
overutilizationCriteria []interface{}
resourceNames []v1.ResourceName
usageSnapshot usageClient
}
var _ frameworktypes.BalancePlugin = &LowNodeUtilization{}
@@ -85,13 +87,23 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f
return nil, fmt.Errorf("error initializing pod filter function: %v", err)
}
resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)
var usageSnapshot usageClient
if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer {
usageSnapshot = newActualUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
} else {
usageSnapshot = newRequestedUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc())
}
return &LowNodeUtilization{
handle: handle,
args: lowNodeUtilizationArgsArgs,
underutilizationCriteria: underutilizationCriteria,
overutilizationCriteria: overutilizationCriteria,
resourceNames: getResourceNames(lowNodeUtilizationArgsArgs.Thresholds),
resourceNames: resourceNames,
podFilter: podFilter,
usageSnapshot: usageSnapshot,
}, nil
}
@@ -102,9 +114,15 @@ func (l *LowNodeUtilization) Name() string {
// Balance extension point implementation for the plugin
func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
if err := l.usageSnapshot.capture(nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
}
lowNodes, sourceNodes := classifyNodes(
getNodeUsage(nodes, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc()),
getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc(), l.args.UseDeviationThresholds),
getNodeUsage(nodes, l.usageSnapshot),
getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.args.UseDeviationThresholds, l.usageSnapshot),
// The node has to be schedulable (to be able to move workload there)
func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool {
if nodeutil.IsNodeUnschedulable(node) {
@@ -172,7 +190,9 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
evictions.EvictOptions{StrategyName: LowNodeUtilizationPluginName},
l.podFilter,
l.resourceNames,
continueEvictionCond)
continueEvictionCond,
l.usageSnapshot,
)
return nil
}

View File

@@ -18,8 +18,12 @@ package nodeutilization
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"testing"
"time"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
@@ -32,10 +36,18 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/test"
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)
func TestLowNodeUtilization(t *testing.T) {
@@ -48,14 +60,17 @@ func TestLowNodeUtilization(t *testing.T) {
notMatchingNodeSelectorValue := "east"
testCases := []struct {
name string
useDeviationThresholds bool
thresholds, targetThresholds api.ResourceThresholds
nodes []*v1.Node
pods []*v1.Pod
expectedPodsEvicted uint
evictedPods []string
evictableNamespaces *api.Namespaces
name string
useDeviationThresholds bool
thresholds, targetThresholds api.ResourceThresholds
nodes []*v1.Node
pods []*v1.Pod
nodemetricses []*v1beta1.NodeMetrics
podmetricses []*v1beta1.PodMetrics
expectedPodsEvicted uint
expectedPodsWithMetricsEvicted uint
evictedPods []string
evictableNamespaces *api.Namespaces
}{
{
name: "no evictable pods",
@@ -103,7 +118,20 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 0,
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 2401, 1714978816),
test.BuildNodeMetrics(n2NodeName, 401, 1714978816),
test.BuildNodeMetrics(n3NodeName, 10, 1714978816),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
},
{
name: "without priorities",
@@ -153,7 +181,20 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 4,
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
},
{
name: "without priorities, but excluding namespaces",
@@ -218,12 +259,25 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
evictableNamespaces: &api.Namespaces{
Exclude: []string{
"namespace1",
},
},
expectedPodsEvicted: 0,
expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
},
{
name: "without priorities, but include only default namespace",
@@ -283,12 +337,25 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
evictableNamespaces: &api.Namespaces{
Include: []string{
"default",
},
},
expectedPodsEvicted: 2,
expectedPodsEvicted: 2,
expectedPodsWithMetricsEvicted: 2,
},
{
name: "without priorities stop when cpu capacity is depleted",
@@ -306,14 +373,14 @@ func TestLowNodeUtilization(t *testing.T) {
test.BuildTestNode(n3NodeName, 4000, 3000, 10, test.SetNodeUnschedulable),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 300, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 300, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 300, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 300, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 300, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p6", 400, 300, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 300, n1NodeName, func(pod *v1.Pod) {
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with local storage.
test.SetNormalOwnerRef(pod)
pod.Spec.Volumes = []v1.Volume{
@@ -330,17 +397,29 @@ func TestLowNodeUtilization(t *testing.T) {
// A Mirror Pod.
pod.Annotations = test.GetMirrorPodAnnotation()
}),
test.BuildTestPod("p8", 400, 300, n1NodeName, func(pod *v1.Pod) {
test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A Critical Pod.
test.SetNormalOwnerRef(pod)
pod.Namespace = "kube-system"
priority := utils.SystemCriticalPriority
pod.Spec.Priority = &priority
}),
test.BuildTestPod("p9", 400, 2100, n2NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
// 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before cpu is depleted
expectedPodsEvicted: 3,
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 0, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
},
{
name: "with priorities",
@@ -410,7 +489,20 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 4,
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
},
{
name: "without priorities evicting best-effort pods only",
@@ -478,8 +570,21 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 4,
evictedPods: []string{"p1", "p2", "p4", "p5"},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
evictedPods: []string{"p1", "p2", "p4", "p5"},
},
{
name: "with extended resource",
@@ -558,8 +663,21 @@ func TestLowNodeUtilization(t *testing.T) {
test.SetPodExtendedResourceRequest(pod, extendedResource, 1)
}),
},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
// 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before extended resource is depleted
expectedPodsEvicted: 3,
expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 0,
},
{
name: "with extended resource in some of nodes",
@@ -586,8 +704,21 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef),
},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
// 0 pods available for eviction because there's no enough extended resource in node2
expectedPodsEvicted: 0,
expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
},
{
name: "without priorities, but only other node is unschedulable",
@@ -636,7 +767,19 @@ func TestLowNodeUtilization(t *testing.T) {
pod.Spec.Priority = &priority
}),
},
expectedPodsEvicted: 0,
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
},
{
name: "without priorities, but only other node doesn't match pod node selector for p4 and p5",
@@ -701,7 +844,17 @@ func TestLowNodeUtilization(t *testing.T) {
pod.Spec.Priority = &priority
}),
},
expectedPodsEvicted: 3,
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
},
expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 3,
},
{
name: "without priorities, but only other node doesn't match pod node affinity for p4 and p5",
@@ -795,7 +948,17 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 3,
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
},
expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 3,
},
{
name: "deviation thresholds",
@@ -847,71 +1010,210 @@ func TestLowNodeUtilization(t *testing.T) {
}),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 2,
evictedPods: []string{},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 2,
expectedPodsWithMetricsEvicted: 2,
evictedPods: []string{},
},
{
name: "without priorities different evictions for requested and actual resources",
thresholds: api.ResourceThresholds{
v1.ResourceCPU: 30,
v1.ResourcePods: 30,
},
targetThresholds: api.ResourceThresholds{
v1.ResourceCPU: 50,
v1.ResourcePods: 50,
},
nodes: []*v1.Node{
test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil),
test.BuildTestNode(n2NodeName, 4000, 3000, 10, func(node *v1.Node) {
node.ObjectMeta.Labels = map[string]string{
nodeSelectorKey: notMatchingNodeSelectorValue,
}
}),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p4", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with affinity to run in the "west" datacenter upon scheduling
test.SetNormalOwnerRef(pod)
pod.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: nodeSelectorKey,
Operator: "In",
Values: []string{nodeSelectorValue},
},
},
},
},
},
},
}
}),
test.BuildTestPod("p5", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with affinity to run in the "west" datacenter upon scheduling
test.SetNormalOwnerRef(pod)
pod.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: nodeSelectorKey,
Operator: "In",
Values: []string{nodeSelectorValue},
},
},
},
},
},
},
}
}),
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with local storage.
test.SetNormalOwnerRef(pod)
pod.Spec.Volumes = []v1.Volume{
{
Name: "sample",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{Path: "somePath"},
EmptyDir: &v1.EmptyDirVolumeSource{
SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI),
},
},
},
}
// A Mirror Pod.
pod.Annotations = test.GetMirrorPodAnnotation()
}),
test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A Critical Pod.
test.SetNormalOwnerRef(pod)
pod.Namespace = "kube-system"
priority := utils.SystemCriticalPriority
pod.Spec.Priority = &priority
}),
test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef),
},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 801, 0),
test.BuildPodMetrics("p2", 801, 0),
test.BuildPodMetrics("p3", 801, 0),
},
expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 2,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var objs []runtime.Object
for _, node := range tc.nodes {
objs = append(objs, node)
}
for _, pod := range tc.pods {
objs = append(objs, pod)
}
fakeClient := fake.NewSimpleClientset(objs...)
var objs []runtime.Object
for _, node := range tc.nodes {
objs = append(objs, node)
}
for _, pod := range tc.pods {
objs = append(objs, pod)
}
var metricsObjs []runtime.Object
for _, nodemetrics := range tc.nodemetricses {
metricsObjs = append(metricsObjs, nodemetrics)
}
for _, podmetrics := range tc.podmetricses {
metricsObjs = append(metricsObjs, podmetrics)
}
podsForEviction := make(map[string]struct{})
for _, pod := range tc.evictedPods {
podsForEviction[pod] = struct{}{}
}
fakeClient := fake.NewSimpleClientset(objs...)
metricsClientset := fakemetricsclient.NewSimpleClientset(metricsObjs...)
collector := metricscollector.NewMetricsCollector(fakeClient, metricsClientset)
err := collector.Collect(ctx)
if err != nil {
t.Fatalf("unable to collect metrics: %v", err)
}
evictionFailed := false
if len(tc.evictedPods) > 0 {
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.CreateAction)
obj := getAction.GetObject()
if eviction, ok := obj.(*policy.Eviction); ok {
if _, exists := podsForEviction[eviction.Name]; exists {
return true, obj, nil
podsForEviction := make(map[string]struct{})
for _, pod := range tc.evictedPods {
podsForEviction[pod] = struct{}{}
}
evictionFailed := false
if len(tc.evictedPods) > 0 {
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.CreateAction)
obj := getAction.GetObject()
if eviction, ok := obj.(*policy.Eviction); ok {
if _, exists := podsForEviction[eviction.Name]; exists {
return true, obj, nil
}
evictionFailed = true
return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name)
}
evictionFailed = true
return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name)
}
return true, obj, nil
})
}
return true, obj, nil
})
}
handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil)
if err != nil {
t.Fatalf("Unable to initialize a framework handle: %v", err)
}
handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil)
if err != nil {
t.Fatalf("Unable to initialize a framework handle: %v", err)
}
handle.MetricsCollectorImpl = collector
plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{
Thresholds: tc.thresholds,
TargetThresholds: tc.targetThresholds,
UseDeviationThresholds: tc.useDeviationThresholds,
EvictableNamespaces: tc.evictableNamespaces,
},
handle)
if err != nil {
t.Fatalf("Unable to initialize the plugin: %v", err)
}
plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes)
plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{
Thresholds: tc.thresholds,
TargetThresholds: tc.targetThresholds,
UseDeviationThresholds: tc.useDeviationThresholds,
EvictableNamespaces: tc.evictableNamespaces,
MetricsUtilization: MetricsUtilization{
MetricsServer: metricsEnabled,
},
},
handle)
if err != nil {
t.Fatalf("Unable to initialize the plugin: %v", err)
}
plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes)
podsEvicted := podEvictor.TotalEvicted()
if tc.expectedPodsEvicted != podsEvicted {
t.Errorf("Expected %v pods to be evicted but %v got evicted", tc.expectedPodsEvicted, podsEvicted)
podsEvicted := podEvictor.TotalEvicted()
if expectedPodsEvicted != podsEvicted {
t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted)
}
if evictionFailed {
t.Errorf("Pod evictions failed unexpectedly")
}
}
if evictionFailed {
t.Errorf("Pod evictions failed unexpectedly")
}
})
}
t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted))
t.Run(tc.name+" with metrics enabled", testFnc(true, tc.expectedPodsWithMetricsEvicted))
}
}
@@ -1067,3 +1369,62 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
})
}
}
func TestLowNodeUtilizationWithMetrics(t *testing.T) {
return
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
AuthToken := "eyJhbGciOiJSUzI1NiIsImtpZCI6IkNoTW9tT2w2cWtzR2V0dURZdjBqdnBSdmdWM29lWmc3dWpfNW0yaDc2NHMifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjIl0sImV4cCI6MTcyODk5MjY3NywiaWF0IjoxNzI4OTg5MDc3LCJpc3MiOiJodHRwczovL2t1YmVybmV0ZXMuZGVmYXVsdC5zdmMiLCJqdGkiOiJkNDY3ZjVmMy0xNGVmLTRkMjItOWJkNC1jMGM1Mzk3NzYyZDgiLCJrdWJlcm5ldGVzLmlvIjp7Im5hbWVzcGFjZSI6Im9wZW5zaGlmdC1tb25pdG9yaW5nIiwic2VydmljZWFjY291bnQiOnsibmFtZSI6InByb21ldGhldXMtazhzIiwidWlkIjoiNjY4NDllMGItYTAwZC00NjUzLWE5NTItNThiYTE1MTk4NTlkIn19LCJuYmYiOjE3Mjg5ODkwNzcsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpvcGVuc2hpZnQtbW9uaXRvcmluZzpwcm9tZXRoZXVzLWs4cyJ9.J1i6-oRAC9J8mqrlZPKGA-CU5PbUzhm2QxAWFnu65-NXR3e252mesybwtjkwxUtTLKrsYHQXwEsG5rGcQsvMcGK9RC9y5z33DFj8tPPwOGLJYJ-s5cTImTqKtWRXzTlcrsrUYTYApfrOsEyXwyfDow4PCslZjR3cd5FMRbvXNqHLg26nG_smApR4wc6kXy7xxlRuGhxu-dUiscQP56njboOK61JdTG8F3FgOayZnKk1jGeVdIhXClqGWJyokk-ZM3mMK1MxzGXY0tLbe37V4B7g3NDiH651BUcicfDSky46yfcAYxMDbZgpK2TByWApAllN0wixz2WsFfyBVu_Q5xtZ9Gi9BUHSa5ioRiBK346W4Bdmr9ala5ldIXDa59YE7UB34DsCHyqvzRx_Sj76hLzy2jSOk7RsL0fM8sDoJL4ROdi-3Jtr5uPY593I8H8qeQvFS6PQfm0bUZqVKrrLoCK_uk9guH4a6K27SlD-Utk3dpsjbmrwcjBxm-zd_LE9YyQ734My00Pcy9D5eNio3gESjGsHqGFc_haq4ZCiVOCkbdmABjpPEL6K7bs1GMZbHt1CONL0-LzymM8vgGNj0grjpG8-5AF8ZuSqR7pbZSV_NO2nKkmrwpILCw0Joqp6V3C9pP9nXWHIDyVMxMK870zxzt_qCoPRLCAujQQn6e0U"
client, err := promapi.NewClient(promapi.Config{
Address: "https://prometheus-k8s-openshift-monitoring.apps.jchaloup-20241015-3.group-b.devcluster.openshift.com",
RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(AuthToken), roundTripper),
})
if err != nil {
t.Fatalf("prom client error: %v", err)
}
// pod:container_cpu_usage:sum
// container_memory_usage_bytes
v1api := promv1.NewAPI(client)
ctx := context.TODO()
// promQuery := "avg_over_time(kube_pod_container_resource_requests[1m])"
promQuery := "kube_pod_container_resource_requests"
results, warnings, err := v1api.Query(ctx, promQuery, time.Now())
fmt.Printf("results: %#v\n", results)
for _, sample := range results.(model.Vector) {
fmt.Printf("sample: %#v\n", sample)
}
fmt.Printf("warnings: %v\n", warnings)
fmt.Printf("err: %v\n", err)
result := model.Value(
&model.Vector{
&model.Sample{
Metric: model.Metric{
"container": "kube-controller-manager",
"endpoint": "https-main",
"job": "kube-state-metrics",
"namespace": "openshift-kube-controller-manager",
"node": "ip-10-0-72-168.us-east-2.compute.internal",
"pod": "kube-controller-manager-ip-10-0-72-168.us-east-2.compute.internal",
"resource": "cpu",
"service": "kube-state-metrics",
"uid": "ae46c09f-ade7-4133-9ee8-cf45ac78ca6d",
"unit": "core",
},
Value: 0.06,
Timestamp: 1728991761711,
},
},
)
fmt.Printf("result: %#v\n", result)
}

View File

@@ -78,14 +78,14 @@ func getNodeThresholds(
nodes []*v1.Node,
lowThreshold, highThreshold api.ResourceThresholds,
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
useDeviationThresholds bool,
usageClient usageClient,
) map[string]NodeThresholds {
nodeThresholdsMap := map[string]NodeThresholds{}
averageResourceUsagePercent := api.ResourceThresholds{}
if useDeviationThresholds {
averageResourceUsagePercent = averageNodeBasicresources(nodes, getPodsAssignedToNode, resourceNames)
averageResourceUsagePercent = averageNodeBasicresources(nodes, usageClient)
}
for _, node := range nodes {
@@ -121,22 +121,15 @@ func getNodeThresholds(
func getNodeUsage(
nodes []*v1.Node,
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
usageClient usageClient,
) []NodeUsage {
var nodeUsageList []NodeUsage
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}
nodeUsageList = append(nodeUsageList, NodeUsage{
node: node,
usage: nodeutil.NodeUtilization(pods, resourceNames),
allPods: pods,
usage: usageClient.nodeUtilization(node.Name),
allPods: usageClient.pods(node.Name),
})
}
@@ -226,6 +219,7 @@ func evictPodsFromSourceNodes(
podFilter func(pod *v1.Pod) bool,
resourceNames []v1.ResourceName,
continueEviction continueEvictionCond,
usageSnapshot usageClient,
) {
// upper bound on total number of pods/cpu/memory and optional extended resources to be moved
totalAvailableUsage := map[v1.ResourceName]*resource.Quantity{
@@ -243,6 +237,10 @@ func evictPodsFromSourceNodes(
totalAvailableUsage[name] = resource.NewQuantity(0, resource.DecimalSI)
}
totalAvailableUsage[name].Add(*node.thresholds.highResourceThreshold[name])
if _, exists := node.usage[name]; !exists {
klog.Errorf("unable to find %q resource in node's %q usage, terminating eviction", name, node.node.Name)
return
}
totalAvailableUsage[name].Sub(*node.usage[name])
}
}
@@ -274,7 +272,7 @@ func evictPodsFromSourceNodes(
klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers")
// sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers.
podutil.SortPodsBasedOnPriorityLowToHigh(removablePods)
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction)
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction, usageSnapshot)
if err != nil {
switch err.(type) {
case *evictions.EvictionTotalLimitError:
@@ -295,6 +293,7 @@ func evictPods(
podEvictor frameworktypes.Evictor,
evictOptions evictions.EvictOptions,
continueEviction continueEvictionCond,
usageSnapshot usageClient,
) error {
var excludedNamespaces sets.Set[string]
if evictableNamespaces != nil {
@@ -320,18 +319,21 @@ func evictPods(
if !preEvictionFilterWithOptions(pod) {
continue
}
podUsage, err := usageSnapshot.podUsage(pod)
if err != nil {
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
continue
}
err = podEvictor.Evict(ctx, pod, evictOptions)
if err == nil {
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))
for name := range totalAvailableUsage {
if name == v1.ResourcePods {
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
} else {
quantity := utils.GetResourceRequestQuantity(pod, name)
nodeInfo.usage[name].Sub(quantity)
totalAvailableUsage[name].Sub(quantity)
nodeInfo.usage[name].Sub(*podUsage[name])
totalAvailableUsage[name].Sub(*podUsage[name])
}
}
@@ -437,17 +439,12 @@ func classifyPods(pods []*v1.Pod, filter func(pod *v1.Pod) bool) ([]*v1.Pod, []*
return nonRemovablePods, removablePods
}
func averageNodeBasicresources(nodes []*v1.Node, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, resourceNames []v1.ResourceName) api.ResourceThresholds {
func averageNodeBasicresources(nodes []*v1.Node, usageClient usageClient) api.ResourceThresholds {
total := api.ResourceThresholds{}
average := api.ResourceThresholds{}
numberOfNodes := len(nodes)
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil)
if err != nil {
numberOfNodes--
continue
}
usage := nodeutil.NodeUtilization(pods, resourceNames)
usage := usageClient.nodeUtilization(node.Name)
nodeCapacity := node.Status.Capacity
if len(node.Status.Allocatable) > 0 {
nodeCapacity = node.Status.Allocatable

View File

@@ -28,6 +28,7 @@ type LowNodeUtilizationArgs struct {
Thresholds api.ResourceThresholds `json:"thresholds"`
TargetThresholds api.ResourceThresholds `json:"targetThresholds"`
NumberOfNodes int `json:"numberOfNodes,omitempty"`
MetricsUtilization MetricsUtilization `json:metricsUtilization,omitempty`
// Naming this one differently since namespaces are still
// considered while considering resources used by pods
@@ -41,10 +42,19 @@ type LowNodeUtilizationArgs struct {
type HighNodeUtilizationArgs struct {
metav1.TypeMeta `json:",inline"`
Thresholds api.ResourceThresholds `json:"thresholds"`
NumberOfNodes int `json:"numberOfNodes,omitempty"`
Thresholds api.ResourceThresholds `json:"thresholds"`
NumberOfNodes int `json:"numberOfNodes,omitempty"`
MetricsUtilization MetricsUtilization `json:metricsUtilization,omitempty`
// Naming this one differently since namespaces are still
// considered while considering resources used by pods
// but then filtered out before eviction
EvictableNamespaces *api.Namespaces `json:"evictableNamespaces,omitempty"`
}
// MetricsUtilization allow to consume actual resource utilization from metrics
type MetricsUtilization struct {
// metricsServer enables metrics from a kubernetes metrics server.
// Please see https://kubernetes-sigs.github.io/metrics-server/ for more.
MetricsServer bool `json:"metricsServer,omitempty"`
}

View File

@@ -0,0 +1,202 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeutilization
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
utilptr "k8s.io/utils/ptr"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/utils"
)
type usageClient interface {
nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity
nodes() []*v1.Node
pods(node string) []*v1.Pod
capture(nodes []*v1.Node) error
podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error)
}
type requestedUsageClient struct {
resourceNames []v1.ResourceName
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
_nodes []*v1.Node
_pods map[string][]*v1.Pod
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
}
var _ usageClient = &requestedUsageClient{}
func newRequestedUsageSnapshot(
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
) *requestedUsageClient {
return &requestedUsageClient{
resourceNames: resourceNames,
getPodsAssignedToNode: getPodsAssignedToNode,
}
}
func (s *requestedUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
return s._nodeUtilization[node]
}
func (s *requestedUsageClient) nodes() []*v1.Node {
return s._nodes
}
func (s *requestedUsageClient) pods(node string) []*v1.Pod {
return s._pods[node]
}
func (s *requestedUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
usage := make(map[v1.ResourceName]*resource.Quantity)
for _, resourceName := range s.resourceNames {
usage[resourceName] = utilptr.To[resource.Quantity](utils.GetResourceRequestQuantity(pod, resourceName).DeepCopy())
}
return usage, nil
}
func (s *requestedUsageClient) capture(nodes []*v1.Node) error {
s._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
s._pods = make(map[string][]*v1.Pod)
capturedNodes := []*v1.Node{}
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}
nodeUsage, err := nodeutil.NodeUtilization(pods, s.resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
})
if err != nil {
return err
}
// store the snapshot of pods from the same (or the closest) node utilization computation
s._pods[node.Name] = pods
s._nodeUtilization[node.Name] = nodeUsage
capturedNodes = append(capturedNodes, node)
}
s._nodes = capturedNodes
return nil
}
type actualUsageClient struct {
resourceNames []v1.ResourceName
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
metricsCollector *metricscollector.MetricsCollector
_nodes []*v1.Node
_pods map[string][]*v1.Pod
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
}
var _ usageClient = &actualUsageClient{}
func newActualUsageSnapshot(
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
metricsCollector *metricscollector.MetricsCollector,
) *actualUsageClient {
return &actualUsageClient{
resourceNames: resourceNames,
getPodsAssignedToNode: getPodsAssignedToNode,
metricsCollector: metricsCollector,
}
}
func (client *actualUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
return client._nodeUtilization[node]
}
func (client *actualUsageClient) nodes() []*v1.Node {
return client._nodes
}
func (client *actualUsageClient) pods(node string) []*v1.Pod {
return client._pods[node]
}
func (client *actualUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
// It's not efficient to keep track of all pods in a cluster when only their fractions is evicted.
// Thus, take the current pod metrics without computing any softening (like e.g. EWMA).
podMetrics, err := client.metricsCollector.MetricsClient().MetricsV1beta1().PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("unable to get podmetrics for %q/%q: %v", pod.Namespace, pod.Name, err)
}
totalUsage := make(map[v1.ResourceName]*resource.Quantity)
for _, container := range podMetrics.Containers {
for _, resourceName := range client.resourceNames {
if _, exists := container.Usage[resourceName]; !exists {
continue
}
if totalUsage[resourceName] == nil {
totalUsage[resourceName] = utilptr.To[resource.Quantity](container.Usage[resourceName].DeepCopy())
} else {
totalUsage[resourceName].Add(container.Usage[resourceName])
}
}
}
return totalUsage, nil
}
func (client *actualUsageClient) capture(nodes []*v1.Node) error {
client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
client._pods = make(map[string][]*v1.Pod)
capturedNodes := []*v1.Node{}
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}
nodeUsage, err := client.metricsCollector.NodeUsage(node)
if err != nil {
return err
}
nodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI)
// store the snapshot of pods from the same (or the closest) node utilization computation
client._pods[node.Name] = pods
client._nodeUtilization[node.Name] = nodeUsage
capturedNodes = append(capturedNodes, node)
}
client._nodes = capturedNodes
return nil
}

View File

@@ -0,0 +1,135 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeutilization
import (
"context"
"fmt"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/test"
)
var gvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodemetricses"}
func updateMetricsAndCheckNodeUtilization(
t *testing.T,
ctx context.Context,
newValue, expectedValue int64,
metricsClientset *fakemetricsclient.Clientset,
collector *metricscollector.MetricsCollector,
usageSnapshot usageClient,
nodes []*v1.Node,
nodeName string,
nodemetrics *v1beta1.NodeMetrics,
) {
t.Logf("Set current node cpu usage to %v", newValue)
nodemetrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(newValue, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, nodemetrics, "")
err := collector.Collect(ctx)
if err != nil {
t.Fatalf("failed to capture metrics: %v", err)
}
err = usageSnapshot.capture(nodes)
if err != nil {
t.Fatalf("failed to capture a snapshot: %v", err)
}
nodeUtilization := usageSnapshot.nodeUtilization(nodeName)
t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue())
if nodeUtilization[v1.ResourceCPU].MilliValue() != expectedValue {
t.Fatalf("cpu node usage expected to be %v, got %v instead", expectedValue, nodeUtilization[v1.ResourceCPU].MilliValue())
}
pods := usageSnapshot.pods(nodeName)
fmt.Printf("pods: %#v\n", pods)
if len(pods) != 2 {
t.Fatalf("expected 2 pods for node %v, got %v instead", nodeName, len(pods))
}
capturedNodes := usageSnapshot.nodes()
if len(capturedNodes) != 3 {
t.Fatalf("expected 3 captured node, got %v instead", len(capturedNodes))
}
}
func TestActualUsageClient(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)
p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil)
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)
nodes := []*v1.Node{n1, n2, n3}
n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)
metricsClientset := fakemetricsclient.NewSimpleClientset(n1metrics, n2metrics, n3metrics)
ctx := context.TODO()
resourceNames := []v1.ResourceName{
v1.ResourceCPU,
v1.ResourceMemory,
}
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
collector := metricscollector.NewMetricsCollector(clientset, metricsClientset)
usageSnapshot := newActualUsageSnapshot(
resourceNames,
podsAssignedToNode,
collector,
)
updateMetricsAndCheckNodeUtilization(t, ctx,
1400, 1400,
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
)
updateMetricsAndCheckNodeUtilization(t, ctx,
500, 1310,
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
)
updateMetricsAndCheckNodeUtilization(t, ctx,
900, 1269,
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
)
}

View File

@@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"k8s.io/klog/v2"
)
@@ -67,6 +68,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev
// handleImpl implements the framework handle which gets passed to plugins
type handleImpl struct {
clientSet clientset.Interface
metricsCollector *metricscollector.MetricsCollector
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
evictor *evictorImpl
@@ -79,6 +81,10 @@ func (hi *handleImpl) ClientSet() clientset.Interface {
return hi.clientSet
}
func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector {
return hi.metricsCollector
}
// GetPodsAssignedToNodeFunc retrieves GetPodsAssignedToNodeFunc implementation
func (hi *handleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc {
return hi.getPodsAssignedToNodeFunc
@@ -128,6 +134,7 @@ type handleImplOpts struct {
sharedInformerFactory informers.SharedInformerFactory
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
podEvictor *evictions.PodEvictor
metricsCollector *metricscollector.MetricsCollector
}
// WithClientSet sets clientSet for the scheduling frameworkImpl.
@@ -155,6 +162,12 @@ func WithGetPodsAssignedToNodeFnc(getPodsAssignedToNodeFunc podutil.GetPodsAssig
}
}
func WithMetricsCollector(metricsCollector *metricscollector.MetricsCollector) Option {
return func(o *handleImplOpts) {
o.metricsCollector = metricsCollector
}
}
func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) {
for idx, pluginConfig := range pluginConfigs {
if pluginConfig.Name == pluginName {
@@ -253,6 +266,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts
profileName: config.Name,
podEvictor: hOpts.podEvictor,
},
metricsCollector: hOpts.metricsCollector,
}
pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...)

View File

@@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
@@ -36,6 +37,7 @@ type Handle interface {
Evictor() Evictor
GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc
SharedInformerFactory() informers.SharedInformerFactory
MetricsCollector() *metricscollector.MetricsCollector
}
// Evictor defines an interface for filtering and evicting pods

View File

@@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
utilptr "k8s.io/utils/ptr"
)
@@ -67,6 +68,26 @@ func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v
return pod
}
// BuildPodMetrics creates a test podmetrics with given parameters.
func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics {
return &v1beta1.PodMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
},
Window: metav1.Duration{Duration: 20010000000},
Containers: []v1beta1.ContainerMetrics{
{
Name: "container-1",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI),
},
},
},
}
}
// GetMirrorPodAnnotation returns the annotation needed for mirror pod.
func GetMirrorPodAnnotation() map[string]string {
return map[string]string{
@@ -135,6 +156,19 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node))
return node
}
func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics {
return &v1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Window: metav1.Duration{Duration: 20010000000},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI),
},
}
}
// MakeBestEffortPod makes the given pod a BestEffort pod
func MakeBestEffortPod(pod *v1.Pod) {
pod.Spec.Containers[0].Resources.Requests = nil

View File

@@ -0,0 +1,77 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
)
// FakeNodeMetricses implements NodeMetricsInterface
type FakeNodeMetricses struct {
Fake *FakeMetricsV1beta1
}
var nodemetricsesResource = v1beta1.SchemeGroupVersion.WithResource("nodemetricses")
var nodemetricsesKind = v1beta1.SchemeGroupVersion.WithKind("NodeMetrics")
// Get takes name of the nodeMetrics, and returns the corresponding nodeMetrics object, and an error if there is any.
func (c *FakeNodeMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.NodeMetrics, err error) {
emptyResult := &v1beta1.NodeMetrics{}
obj, err := c.Fake.
Invokes(testing.NewRootGetActionWithOptions(nodemetricsesResource, name, options), emptyResult)
if obj == nil {
return emptyResult, err
}
return obj.(*v1beta1.NodeMetrics), err
}
// List takes label and field selectors, and returns the list of NodeMetricses that match those selectors.
func (c *FakeNodeMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.NodeMetricsList, err error) {
emptyResult := &v1beta1.NodeMetricsList{}
obj, err := c.Fake.
Invokes(testing.NewRootListActionWithOptions(nodemetricsesResource, nodemetricsesKind, opts), emptyResult)
if obj == nil {
return emptyResult, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &v1beta1.NodeMetricsList{ListMeta: obj.(*v1beta1.NodeMetricsList).ListMeta}
for _, item := range obj.(*v1beta1.NodeMetricsList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested nodeMetricses.
func (c *FakeNodeMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewRootWatchActionWithOptions(nodemetricsesResource, opts))
}

View File

@@ -0,0 +1,81 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
)
// FakePodMetricses implements PodMetricsInterface
type FakePodMetricses struct {
Fake *FakeMetricsV1beta1
ns string
}
var podmetricsesResource = v1beta1.SchemeGroupVersion.WithResource("podmetricses")
var podmetricsesKind = v1beta1.SchemeGroupVersion.WithKind("PodMetrics")
// Get takes name of the podMetrics, and returns the corresponding podMetrics object, and an error if there is any.
func (c *FakePodMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.PodMetrics, err error) {
emptyResult := &v1beta1.PodMetrics{}
obj, err := c.Fake.
Invokes(testing.NewGetActionWithOptions(podmetricsesResource, c.ns, name, options), emptyResult)
if obj == nil {
return emptyResult, err
}
return obj.(*v1beta1.PodMetrics), err
}
// List takes label and field selectors, and returns the list of PodMetricses that match those selectors.
func (c *FakePodMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.PodMetricsList, err error) {
emptyResult := &v1beta1.PodMetricsList{}
obj, err := c.Fake.
Invokes(testing.NewListActionWithOptions(podmetricsesResource, podmetricsesKind, c.ns, opts), emptyResult)
if obj == nil {
return emptyResult, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &v1beta1.PodMetricsList{ListMeta: obj.(*v1beta1.PodMetricsList).ListMeta}
for _, item := range obj.(*v1beta1.PodMetricsList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested podMetricses.
func (c *FakePodMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewWatchActionWithOptions(podmetricsesResource, c.ns, opts))
}