1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

Compare commits

...

8 Commits

Author SHA1 Message Date
Jan Chaloupka
b56794708d descheduler: wire the metrics collector with the framework handle 2024-11-05 21:13:27 +01:00
Jan Chaloupka
b7b352780e LowNodeUtilization: test metrics based utilization 2024-11-05 21:11:33 +01:00
Jan Chaloupka
646a383b37 Get pod usage from the usage client 2024-11-05 14:07:59 +01:00
Jan Chaloupka
ad18f41b66 Update actualUsageClient 2024-11-04 18:11:27 +01:00
Jan Chaloupka
80f9c0ada6 Separate usage client into a new file 2024-10-21 22:38:25 +02:00
Jan Chaloupka
3174107718 usageSnapshot -> requestedUsageClient 2024-10-21 22:22:38 +02:00
Jan Chaloupka
1f55c4d680 node utilization: abstract pod utilization retriever 2024-10-15 12:18:37 +02:00
Jan Chaloupka
dc9bea3ede nodeutiliation: create a usage snapshot 2024-10-15 12:18:30 +02:00
23 changed files with 1519 additions and 148 deletions

View File

@@ -26,6 +26,8 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
componentbaseconfig "k8s.io/component-base/config" componentbaseconfig "k8s.io/component-base/config"
componentbaseoptions "k8s.io/component-base/config/options" componentbaseoptions "k8s.io/component-base/config/options"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
"sigs.k8s.io/descheduler/pkg/apis/componentconfig" "sigs.k8s.io/descheduler/pkg/apis/componentconfig"
"sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1" "sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1"
deschedulerscheme "sigs.k8s.io/descheduler/pkg/descheduler/scheme" deschedulerscheme "sigs.k8s.io/descheduler/pkg/descheduler/scheme"
@@ -42,6 +44,7 @@ type DeschedulerServer struct {
Client clientset.Interface Client clientset.Interface
EventClient clientset.Interface EventClient clientset.Interface
MetricsClient metricsclient.Interface
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
DisableMetrics bool DisableMetrics bool
EnableHTTP2 bool EnableHTTP2 bool

View File

@@ -41,6 +41,9 @@ type DeschedulerPolicy struct {
// MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total.
MaxNoOfPodsToEvictTotal *uint MaxNoOfPodsToEvictTotal *uint
// MetricsCollector configures collection of metrics about actual resource utilization
MetricsCollector MetricsCollector
} }
// Namespaces carries a list of included/excluded namespaces // Namespaces carries a list of included/excluded namespaces
@@ -84,3 +87,10 @@ type PluginSet struct {
Enabled []string Enabled []string
Disabled []string Disabled []string
} }
// MetricsCollector configures collection of metrics about actual resource utilization
type MetricsCollector struct {
// Enabled metrics collection from kubernetes metrics.
// Later, the collection can be extended to other providers.
Enabled bool
}

View File

@@ -40,6 +40,9 @@ type DeschedulerPolicy struct {
// MaxNoOfPodsToTotal restricts maximum of pods to be evicted total. // MaxNoOfPodsToTotal restricts maximum of pods to be evicted total.
MaxNoOfPodsToEvictTotal *uint `json:"maxNoOfPodsToEvictTotal,omitempty"` MaxNoOfPodsToEvictTotal *uint `json:"maxNoOfPodsToEvictTotal,omitempty"`
// MetricsCollector configures collection of metrics about actual resource utilization
MetricsCollector MetricsCollector `json:"metricsCollector,omitempty"`
} }
type DeschedulerProfile struct { type DeschedulerProfile struct {
@@ -66,3 +69,10 @@ type PluginSet struct {
Enabled []string `json:"enabled"` Enabled []string `json:"enabled"`
Disabled []string `json:"disabled"` Disabled []string `json:"disabled"`
} }
// MetricsCollector configures collection of metrics about actual resource utilization
type MetricsCollector struct {
// Enabled metrics collection from kubernetes metrics.
// Later, the collection can be extended to other providers.
Enabled bool
}

View File

@@ -21,6 +21,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
componentbaseconfig "k8s.io/component-base/config" componentbaseconfig "k8s.io/component-base/config"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
// Ensure to load all auth plugins. // Ensure to load all auth plugins.
_ "k8s.io/client-go/plugin/pkg/client/auth" _ "k8s.io/client-go/plugin/pkg/client/auth"
@@ -28,7 +29,7 @@ import (
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
) )
func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) { func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) {
var cfg *rest.Config var cfg *rest.Config
if len(clientConnection.Kubeconfig) != 0 { if len(clientConnection.Kubeconfig) != 0 {
master, err := GetMasterFromKubeconfig(clientConnection.Kubeconfig) master, err := GetMasterFromKubeconfig(clientConnection.Kubeconfig)
@@ -56,9 +57,28 @@ func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfigura
cfg = rest.AddUserAgent(cfg, userAgt) cfg = rest.AddUserAgent(cfg, userAgt)
} }
return cfg, nil
}
func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) {
cfg, err := createConfig(clientConnection, userAgt)
if err != nil {
return nil, fmt.Errorf("unable to create config: %v", err)
}
return clientset.NewForConfig(cfg) return clientset.NewForConfig(cfg)
} }
func CreateMetricsClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (metricsclient.Interface, error) {
cfg, err := createConfig(clientConnection, userAgt)
if err != nil {
return nil, fmt.Errorf("unable to create config: %v", err)
}
// Create the metrics clientset to access the metrics.k8s.io API
return metricsclient.NewForConfig(cfg)
}
func GetMasterFromKubeconfig(filename string) (string, error) { func GetMasterFromKubeconfig(filename string) (string, error) {
config, err := clientcmd.LoadFromFile(filename) config, err := clientcmd.LoadFromFile(filename)
if err != nil { if err != nil {

View File

@@ -23,43 +23,41 @@ import (
"strconv" "strconv"
"time" "time"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
componentbaseconfig "k8s.io/component-base/config" componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/metrics" "sigs.k8s.io/descheduler/metrics"
"sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/evictions"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile" frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
) )
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
@@ -78,6 +76,7 @@ type descheduler struct {
eventRecorder events.EventRecorder eventRecorder events.EventRecorder
podEvictor *evictions.PodEvictor podEvictor *evictions.PodEvictor
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
metricsCollector *metricscollector.MetricsCollector
} }
type informerResources struct { type informerResources struct {
@@ -152,6 +151,11 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche
WithMetricsEnabled(!rs.DisableMetrics), WithMetricsEnabled(!rs.DisableMetrics),
) )
var metricsCollector *metricscollector.MetricsCollector
if deschedulerPolicy.MetricsCollector.Enabled {
metricsCollector = metricscollector.NewMetricsCollector(rs.Client, rs.MetricsClient)
}
return &descheduler{ return &descheduler{
rs: rs, rs: rs,
ir: ir, ir: ir,
@@ -161,6 +165,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche
eventRecorder: eventRecorder, eventRecorder: eventRecorder,
podEvictor: podEvictor, podEvictor: podEvictor,
podEvictionReactionFnc: podEvictionReactionFnc, podEvictionReactionFnc: podEvictionReactionFnc,
metricsCollector: metricsCollector,
}, nil }, nil
} }
@@ -240,6 +245,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithPodEvictor(d.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
frameworkprofile.WithMetricsCollector(d.metricsCollector),
) )
if err != nil { if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
@@ -304,6 +310,14 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
return err return err
} }
if deschedulerPolicy.MetricsCollector.Enabled {
metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler")
if err != nil {
return err
}
rs.MetricsClient = metricsClient
}
runFn := func() error { runFn := func() error {
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion) return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion)
} }
@@ -411,6 +425,12 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done())
go func() {
klog.V(2).Infof("Starting metrics collector")
descheduler.metricsCollector.Run(ctx)
klog.V(2).Infof("Stopped metrics collector")
}()
wait.NonSlidingUntil(func() { wait.NonSlidingUntil(func() {
// A next context is created here intentionally to avoid nesting the spans via context. // A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil") sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")

View File

@@ -16,11 +16,16 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake" fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
utilptr "k8s.io/utils/ptr" utilptr "k8s.io/utils/ptr"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
"sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates" "sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints" "sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints"
"sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/pkg/utils"
@@ -33,6 +38,7 @@ func initPluginRegistry() {
pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry) pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(defaultevictor.PluginName, defaultevictor.New, &defaultevictor.DefaultEvictor{}, &defaultevictor.DefaultEvictorArgs{}, defaultevictor.ValidateDefaultEvictorArgs, defaultevictor.SetDefaults_DefaultEvictorArgs, pluginregistry.PluginRegistry) pluginregistry.Register(defaultevictor.PluginName, defaultevictor.New, &defaultevictor.DefaultEvictor{}, &defaultevictor.DefaultEvictorArgs{}, defaultevictor.ValidateDefaultEvictorArgs, defaultevictor.SetDefaults_DefaultEvictorArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(removepodsviolatingnodetaints.PluginName, removepodsviolatingnodetaints.New, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaints{}, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaintsArgs{}, removepodsviolatingnodetaints.ValidateRemovePodsViolatingNodeTaintsArgs, removepodsviolatingnodetaints.SetDefaults_RemovePodsViolatingNodeTaintsArgs, pluginregistry.PluginRegistry) pluginregistry.Register(removepodsviolatingnodetaints.PluginName, removepodsviolatingnodetaints.New, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaints{}, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaintsArgs{}, removepodsviolatingnodetaints.ValidateRemovePodsViolatingNodeTaintsArgs, removepodsviolatingnodetaints.SetDefaults_RemovePodsViolatingNodeTaintsArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(nodeutilization.LowNodeUtilizationPluginName, nodeutilization.NewLowNodeUtilization, &nodeutilization.LowNodeUtilization{}, &nodeutilization.LowNodeUtilizationArgs{}, nodeutilization.ValidateLowNodeUtilizationArgs, nodeutilization.SetDefaults_LowNodeUtilizationArgs, pluginregistry.PluginRegistry)
} }
func removePodsViolatingNodeTaintsPolicy() *api.DeschedulerPolicy { func removePodsViolatingNodeTaintsPolicy() *api.DeschedulerPolicy {
@@ -99,6 +105,44 @@ func removeDuplicatesPolicy() *api.DeschedulerPolicy {
} }
} }
func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThresholds, metricsEnabled bool) *api.DeschedulerPolicy {
return &api.DeschedulerPolicy{
Profiles: []api.DeschedulerProfile{
{
Name: "Profile",
PluginConfigs: []api.PluginConfig{
{
Name: nodeutilization.LowNodeUtilizationPluginName,
Args: &nodeutilization.LowNodeUtilizationArgs{
Thresholds: thresholds,
TargetThresholds: targetThresholds,
MetricsUtilization: nodeutilization.MetricsUtilization{
MetricsServer: metricsEnabled,
},
},
},
{
Name: defaultevictor.PluginName,
Args: &defaultevictor.DefaultEvictorArgs{},
},
},
Plugins: api.Plugins{
Filter: api.PluginSet{
Enabled: []string{
defaultevictor.PluginName,
},
},
Balance: api.PluginSet{
Enabled: []string{
nodeutilization.LowNodeUtilizationPluginName,
},
},
},
},
},
}
}
func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
client := fakeclientset.NewSimpleClientset(objects...) client := fakeclientset.NewSimpleClientset(objects...)
eventClient := fakeclientset.NewSimpleClientset(objects...) eventClient := fakeclientset.NewSimpleClientset(objects...)
@@ -539,3 +583,76 @@ func TestDeschedulingLimits(t *testing.T) {
}) })
} }
} }
func TestLoadAwareDescheduling(t *testing.T) {
initPluginRegistry()
ownerRef1 := test.GetReplicaSetOwnerRefList()
updatePod := func(pod *v1.Pod) {
pod.ObjectMeta.OwnerReferences = ownerRef1
}
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
nodes := []*v1.Node{node1, node2}
p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod)
p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod)
p3 := test.BuildTestPod("p3", 300, 0, node1.Name, updatePod)
p4 := test.BuildTestPod("p4", 300, 0, node1.Name, updatePod)
p5 := test.BuildTestPod("p5", 300, 0, node1.Name, updatePod)
ctxCancel, cancel := context.WithCancel(ctx)
_, descheduler, client := initDescheduler(
t,
ctxCancel,
lowNodeUtilizationPolicy(
api.ResourceThresholds{
v1.ResourceCPU: 30,
v1.ResourcePods: 30,
},
api.ResourceThresholds{
v1.ResourceCPU: 50,
v1.ResourcePods: 50,
},
true, // enabled metrics utilization
),
node1, node2, p1, p2, p3, p4, p5)
defer cancel()
nodemetricses := []*v1beta1.NodeMetrics{
test.BuildNodeMetrics("n1", 2400, 3000),
test.BuildNodeMetrics("n2", 400, 0),
}
podmetricses := []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 400, 0),
test.BuildPodMetrics("p2", 400, 0),
test.BuildPodMetrics("p3", 400, 0),
test.BuildPodMetrics("p4", 400, 0),
test.BuildPodMetrics("p5", 400, 0),
}
var metricsObjs []runtime.Object
for _, nodemetrics := range nodemetricses {
metricsObjs = append(metricsObjs, nodemetrics)
}
for _, podmetrics := range podmetricses {
metricsObjs = append(metricsObjs, podmetrics)
}
metricsClientset := fakemetricsclient.NewSimpleClientset(metricsObjs...)
descheduler.metricsCollector = metricscollector.NewMetricsCollector(client, metricsClientset)
descheduler.metricsCollector.Collect(ctx)
err := descheduler.runDeschedulerLoop(ctx, nodes)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
totalEs := descheduler.podEvictor.TotalEvicted()
if totalEs != 2 {
t.Fatalf("Expected %v evictions in total, got %v instead", 2, totalEs)
}
t.Logf("Total evictions: %v", totalEs)
}

View File

@@ -0,0 +1,125 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricscollector
import (
"context"
"fmt"
"math"
"sync"
"time"
"k8s.io/klog/v2"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
utilptr "k8s.io/utils/ptr"
)
const (
beta float64 = 0.9
)
type MetricsCollector struct {
clientset kubernetes.Interface
metricsClientset metricsclient.Interface
nodes map[string]map[v1.ResourceName]*resource.Quantity
mu sync.Mutex
}
func NewMetricsCollector(clientset kubernetes.Interface, metricsClientset metricsclient.Interface) *MetricsCollector {
return &MetricsCollector{
clientset: clientset,
metricsClientset: metricsClientset,
nodes: make(map[string]map[v1.ResourceName]*resource.Quantity),
}
}
func (mc *MetricsCollector) Run(ctx context.Context) {
wait.NonSlidingUntil(func() {
mc.Collect(ctx)
}, 5*time.Second, ctx.Done())
}
func weightedAverage(prevValue, value int64) int64 {
return int64(math.Floor(beta*float64(prevValue) + (1-beta)*float64(value)))
}
func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resource.Quantity, error) {
mc.mu.Lock()
defer mc.mu.Unlock()
if _, exists := mc.nodes[node.Name]; !exists {
klog.V(4).Infof("unable to find node %q in the collected metrics", node.Name)
return nil, fmt.Errorf("unable to find node %q in the collected metrics", node.Name)
}
return map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceCPU].DeepCopy()),
v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceMemory].DeepCopy()),
}, nil
}
func (mc *MetricsCollector) MetricsClient() metricsclient.Interface {
return mc.metricsClientset
}
func (mc *MetricsCollector) Collect(ctx context.Context) error {
mc.mu.Lock()
defer mc.mu.Unlock()
nodes, err := mc.clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("unable to list nodes: %v", err)
}
for _, node := range nodes.Items {
metrics, err := mc.metricsClientset.MetricsV1beta1().NodeMetricses().Get(context.TODO(), node.Name, metav1.GetOptions{})
if err != nil {
fmt.Printf("Error fetching metrics for node %s: %v\n", node.Name, err)
// No entry -> duplicate the previous value -> do nothing as beta*PV + (1-beta)*PV = PV
continue
}
if _, exists := mc.nodes[node.Name]; !exists {
mc.nodes[node.Name] = map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: utilptr.To[resource.Quantity](metrics.Usage.Cpu().DeepCopy()),
v1.ResourceMemory: utilptr.To[resource.Quantity](metrics.Usage.Memory().DeepCopy()),
}
} else {
// get MilliValue to reduce loss of precision
mc.nodes[node.Name][v1.ResourceCPU].SetMilli(
weightedAverage(mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), metrics.Usage.Cpu().MilliValue()),
)
mc.nodes[node.Name][v1.ResourceMemory].SetMilli(
weightedAverage(mc.nodes[node.Name][v1.ResourceMemory].MilliValue(), metrics.Usage.Memory().MilliValue()),
)
}
// Display CPU and memory usage
// fmt.Printf("%s: %vm, %vMi\n", node.Name, metrics.Usage.Cpu().MilliValue(), metrics.Usage.Memory().Value()/(1024*1024))
// fmt.Printf("%s: %vm, %vMi\n", node.Name, mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), mc.nodes[node.Name][v1.ResourceMemory].Value()/(1024*1024))
}
fmt.Printf("--\n")
return nil
}

View File

@@ -0,0 +1,103 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricscollector
import (
"context"
"os"
"testing"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
fakeclientset "k8s.io/client-go/kubernetes/fake"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/descheduler/test"
)
func TestMetricsCollector1(t *testing.T) {
kubeconfig := os.Getenv("KUBECONFIG")
// Use the kubeconfig to build the Kubernetes client
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
// Create the standard Kubernetes clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// Create the metrics clientset to access the metrics.k8s.io API
metricsClientset, err := metricsclient.NewForConfig(config)
if err != nil {
panic(err.Error())
}
collector := NewMetricsCollector(clientset, metricsClientset)
collector.Run(context.TODO())
// collector.Collect(context.TODO())
}
func checkCpuNodeUsage(t *testing.T, usage map[v1.ResourceName]*resource.Quantity, millicpu int64) {
t.Logf("current node cpu usage: %v\n", usage[v1.ResourceCPU].MilliValue())
if usage[v1.ResourceCPU].MilliValue() != millicpu {
t.Fatalf("cpu node usage expected to be %v, got %v instead", millicpu, usage[v1.ResourceCPU].MilliValue())
}
}
func TestMetricsCollector2(t *testing.T) {
gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodemetricses"}
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)
n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3)
metricsClientset := fakemetricsclient.NewSimpleClientset(n1metrics, n2metrics, n3metrics)
t.Logf("Set initial node cpu usage to 1400")
collector := NewMetricsCollector(clientset, metricsClientset)
collector.Collect(context.TODO())
nodesUsage, _ := collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1400)
t.Logf("Set current node cpu usage to 500")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(500, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1310)
t.Logf("Set current node cpu usage to 500")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1268)
}

View File

@@ -218,7 +218,12 @@ func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nod
resourceNames = append(resourceNames, name) resourceNames = append(resourceNames, name)
} }
availableResources, err := nodeAvailableResources(nodeIndexer, node, resourceNames) availableResources, err := nodeAvailableResources(nodeIndexer, node, resourceNames,
func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
},
)
if err != nil { if err != nil {
return false, err return false, err
} }
@@ -239,12 +244,15 @@ func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nod
} }
// nodeAvailableResources returns resources mapped to the quanitity available on the node. // nodeAvailableResources returns resources mapped to the quanitity available on the node.
func nodeAvailableResources(nodeIndexer podutil.GetPodsAssignedToNodeFunc, node *v1.Node, resourceNames []v1.ResourceName) (map[v1.ResourceName]*resource.Quantity, error) { func nodeAvailableResources(nodeIndexer podutil.GetPodsAssignedToNodeFunc, node *v1.Node, resourceNames []v1.ResourceName, podUtilization podutil.PodUtilizationFnc) (map[v1.ResourceName]*resource.Quantity, error) {
podsOnNode, err := podutil.ListPodsOnANode(node.Name, nodeIndexer, nil) podsOnNode, err := podutil.ListPodsOnANode(node.Name, nodeIndexer, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodeUtilization := NodeUtilization(podsOnNode, resourceNames) nodeUtilization, err := NodeUtilization(podsOnNode, resourceNames, podUtilization)
if err != nil {
return nil, err
}
remainingResources := map[v1.ResourceName]*resource.Quantity{ remainingResources := map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: resource.NewMilliQuantity(node.Status.Allocatable.Cpu().MilliValue()-nodeUtilization[v1.ResourceCPU].MilliValue(), resource.DecimalSI), v1.ResourceCPU: resource.NewMilliQuantity(node.Status.Allocatable.Cpu().MilliValue()-nodeUtilization[v1.ResourceCPU].MilliValue(), resource.DecimalSI),
v1.ResourceMemory: resource.NewQuantity(node.Status.Allocatable.Memory().Value()-nodeUtilization[v1.ResourceMemory].Value(), resource.BinarySI), v1.ResourceMemory: resource.NewQuantity(node.Status.Allocatable.Memory().Value()-nodeUtilization[v1.ResourceMemory].Value(), resource.BinarySI),
@@ -265,31 +273,34 @@ func nodeAvailableResources(nodeIndexer podutil.GetPodsAssignedToNodeFunc, node
} }
// NodeUtilization returns the resources requested by the given pods. Only resources supplied in the resourceNames parameter are calculated. // NodeUtilization returns the resources requested by the given pods. Only resources supplied in the resourceNames parameter are calculated.
func NodeUtilization(pods []*v1.Pod, resourceNames []v1.ResourceName) map[v1.ResourceName]*resource.Quantity { func NodeUtilization(pods []*v1.Pod, resourceNames []v1.ResourceName, podUtilization podutil.PodUtilizationFnc) (map[v1.ResourceName]*resource.Quantity, error) {
totalReqs := map[v1.ResourceName]*resource.Quantity{ totalUtilization := map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: resource.NewMilliQuantity(0, resource.DecimalSI), v1.ResourceCPU: resource.NewMilliQuantity(0, resource.DecimalSI),
v1.ResourceMemory: resource.NewQuantity(0, resource.BinarySI), v1.ResourceMemory: resource.NewQuantity(0, resource.BinarySI),
v1.ResourcePods: resource.NewQuantity(int64(len(pods)), resource.DecimalSI), v1.ResourcePods: resource.NewQuantity(int64(len(pods)), resource.DecimalSI),
} }
for _, name := range resourceNames { for _, name := range resourceNames {
if !IsBasicResource(name) { if !IsBasicResource(name) {
totalReqs[name] = resource.NewQuantity(0, resource.DecimalSI) totalUtilization[name] = resource.NewQuantity(0, resource.DecimalSI)
} }
} }
for _, pod := range pods { for _, pod := range pods {
req, _ := utils.PodRequestsAndLimits(pod) podUtil, err := podUtilization(pod)
if err != nil {
return nil, err
}
for _, name := range resourceNames { for _, name := range resourceNames {
quantity, ok := req[name] quantity, ok := podUtil[name]
if ok && name != v1.ResourcePods { if ok && name != v1.ResourcePods {
// As Quantity.Add says: Add adds the provided y quantity to the current value. If the current value is zero, // As Quantity.Add says: Add adds the provided y quantity to the current value. If the current value is zero,
// the format of the quantity will be updated to the format of y. // the format of the quantity will be updated to the format of y.
totalReqs[name].Add(quantity) totalUtilization[name].Add(quantity)
} }
} }
} }
return totalReqs return totalUtilization, nil
} }
// IsBasicResource checks if resource is basic native. // IsBasicResource checks if resource is basic native.

View File

@@ -39,6 +39,9 @@ type FilterFunc func(*v1.Pod) bool
// as input and returns the pods that assigned to the node. // as input and returns the pods that assigned to the node.
type GetPodsAssignedToNodeFunc func(string, FilterFunc) ([]*v1.Pod, error) type GetPodsAssignedToNodeFunc func(string, FilterFunc) ([]*v1.Pod, error)
// PodUtilizationFnc is a function for getting pod's utilization. E.g. requested resources of utilization from metrics.
type PodUtilizationFnc func(pod *v1.Pod) (v1.ResourceList, error)
// WrapFilterFuncs wraps a set of FilterFunc in one. // WrapFilterFuncs wraps a set of FilterFunc in one.
func WrapFilterFuncs(filters ...FilterFunc) FilterFunc { func WrapFilterFuncs(filters ...FilterFunc) FilterFunc {
return func(pod *v1.Pod) bool { return func(pod *v1.Pod) bool {

View File

@@ -8,6 +8,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
) )
@@ -18,6 +19,7 @@ type HandleImpl struct {
SharedInformerFactoryImpl informers.SharedInformerFactory SharedInformerFactoryImpl informers.SharedInformerFactory
EvictorFilterImpl frameworktypes.EvictorPlugin EvictorFilterImpl frameworktypes.EvictorPlugin
PodEvictorImpl *evictions.PodEvictor PodEvictorImpl *evictions.PodEvictor
MetricsCollectorImpl *metricscollector.MetricsCollector
} }
var _ frameworktypes.Handle = &HandleImpl{} var _ frameworktypes.Handle = &HandleImpl{}
@@ -26,6 +28,10 @@ func (hi *HandleImpl) ClientSet() clientset.Interface {
return hi.ClientsetImpl return hi.ClientsetImpl
} }
func (hi *HandleImpl) MetricsCollector() *metricscollector.MetricsCollector {
return hi.MetricsCollectorImpl
}
func (hi *HandleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc { func (hi *HandleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc {
return hi.GetPodsAssignedToNodeFuncImpl return hi.GetPodsAssignedToNodeFuncImpl
} }

View File

@@ -44,6 +44,7 @@ type HighNodeUtilization struct {
underutilizationCriteria []interface{} underutilizationCriteria []interface{}
resourceNames []v1.ResourceName resourceNames []v1.ResourceName
targetThresholds api.ResourceThresholds targetThresholds api.ResourceThresholds
usageSnapshot usageClient
} }
var _ frameworktypes.BalancePlugin = &HighNodeUtilization{} var _ frameworktypes.BalancePlugin = &HighNodeUtilization{}
@@ -84,6 +85,7 @@ func NewHighNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (
targetThresholds: targetThresholds, targetThresholds: targetThresholds,
underutilizationCriteria: underutilizationCriteria, underutilizationCriteria: underutilizationCriteria,
podFilter: podFilter, podFilter: podFilter,
usageSnapshot: newRequestedUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc()),
}, nil }, nil
} }
@@ -94,9 +96,15 @@ func (h *HighNodeUtilization) Name() string {
// Balance extension point implementation for the plugin // Balance extension point implementation for the plugin
func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
if err := h.usageSnapshot.capture(nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
}
sourceNodes, highNodes := classifyNodes( sourceNodes, highNodes := classifyNodes(
getNodeUsage(nodes, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc()), getNodeUsage(nodes, h.usageSnapshot),
getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, h.handle.GetPodsAssignedToNodeFunc(), false), getNodeThresholds(nodes, h.args.Thresholds, h.targetThresholds, h.resourceNames, false, h.usageSnapshot),
func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool { func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool {
return isNodeWithLowUtilization(usage, threshold.lowResourceThreshold) return isNodeWithLowUtilization(usage, threshold.lowResourceThreshold)
}, },
@@ -152,7 +160,9 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
evictions.EvictOptions{StrategyName: HighNodeUtilizationPluginName}, evictions.EvictOptions{StrategyName: HighNodeUtilizationPluginName},
h.podFilter, h.podFilter,
h.resourceNames, h.resourceNames,
continueEvictionCond) continueEvictionCond,
h.usageSnapshot,
)
return nil return nil
} }

View File

@@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/evictions"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
@@ -43,6 +44,7 @@ type LowNodeUtilization struct {
underutilizationCriteria []interface{} underutilizationCriteria []interface{}
overutilizationCriteria []interface{} overutilizationCriteria []interface{}
resourceNames []v1.ResourceName resourceNames []v1.ResourceName
usageSnapshot usageClient
} }
var _ frameworktypes.BalancePlugin = &LowNodeUtilization{} var _ frameworktypes.BalancePlugin = &LowNodeUtilization{}
@@ -85,13 +87,23 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f
return nil, fmt.Errorf("error initializing pod filter function: %v", err) return nil, fmt.Errorf("error initializing pod filter function: %v", err)
} }
resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)
var usageSnapshot usageClient
if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer {
usageSnapshot = newActualUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
} else {
usageSnapshot = newRequestedUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc())
}
return &LowNodeUtilization{ return &LowNodeUtilization{
handle: handle, handle: handle,
args: lowNodeUtilizationArgsArgs, args: lowNodeUtilizationArgsArgs,
underutilizationCriteria: underutilizationCriteria, underutilizationCriteria: underutilizationCriteria,
overutilizationCriteria: overutilizationCriteria, overutilizationCriteria: overutilizationCriteria,
resourceNames: getResourceNames(lowNodeUtilizationArgsArgs.Thresholds), resourceNames: resourceNames,
podFilter: podFilter, podFilter: podFilter,
usageSnapshot: usageSnapshot,
}, nil }, nil
} }
@@ -102,9 +114,15 @@ func (l *LowNodeUtilization) Name() string {
// Balance extension point implementation for the plugin // Balance extension point implementation for the plugin
func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
if err := l.usageSnapshot.capture(nodes); err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
}
lowNodes, sourceNodes := classifyNodes( lowNodes, sourceNodes := classifyNodes(
getNodeUsage(nodes, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc()), getNodeUsage(nodes, l.usageSnapshot),
getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.handle.GetPodsAssignedToNodeFunc(), l.args.UseDeviationThresholds), getNodeThresholds(nodes, l.args.Thresholds, l.args.TargetThresholds, l.resourceNames, l.args.UseDeviationThresholds, l.usageSnapshot),
// The node has to be schedulable (to be able to move workload there) // The node has to be schedulable (to be able to move workload there)
func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool { func(node *v1.Node, usage NodeUsage, threshold NodeThresholds) bool {
if nodeutil.IsNodeUnschedulable(node) { if nodeutil.IsNodeUnschedulable(node) {
@@ -172,7 +190,9 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
evictions.EvictOptions{StrategyName: LowNodeUtilizationPluginName}, evictions.EvictOptions{StrategyName: LowNodeUtilizationPluginName},
l.podFilter, l.podFilter,
l.resourceNames, l.resourceNames,
continueEvictionCond) continueEvictionCond,
l.usageSnapshot,
)
return nil return nil
} }

View File

@@ -18,8 +18,12 @@ package nodeutilization
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"net"
"net/http"
"testing" "testing"
"time"
"sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
@@ -32,10 +36,18 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/test" "sigs.k8s.io/descheduler/test"
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
) )
func TestLowNodeUtilization(t *testing.T) { func TestLowNodeUtilization(t *testing.T) {
@@ -53,7 +65,10 @@ func TestLowNodeUtilization(t *testing.T) {
thresholds, targetThresholds api.ResourceThresholds thresholds, targetThresholds api.ResourceThresholds
nodes []*v1.Node nodes []*v1.Node
pods []*v1.Pod pods []*v1.Pod
nodemetricses []*v1beta1.NodeMetrics
podmetricses []*v1beta1.PodMetrics
expectedPodsEvicted uint expectedPodsEvicted uint
expectedPodsWithMetricsEvicted uint
evictedPods []string evictedPods []string
evictableNamespaces *api.Namespaces evictableNamespaces *api.Namespaces
}{ }{
@@ -103,7 +118,20 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 2401, 1714978816),
test.BuildNodeMetrics(n2NodeName, 401, 1714978816),
test.BuildNodeMetrics(n3NodeName, 10, 1714978816),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 0, expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
}, },
{ {
name: "without priorities", name: "without priorities",
@@ -153,7 +181,20 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4, expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
}, },
{ {
name: "without priorities, but excluding namespaces", name: "without priorities, but excluding namespaces",
@@ -218,12 +259,25 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
evictableNamespaces: &api.Namespaces{ evictableNamespaces: &api.Namespaces{
Exclude: []string{ Exclude: []string{
"namespace1", "namespace1",
}, },
}, },
expectedPodsEvicted: 0, expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
}, },
{ {
name: "without priorities, but include only default namespace", name: "without priorities, but include only default namespace",
@@ -283,12 +337,25 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
evictableNamespaces: &api.Namespaces{ evictableNamespaces: &api.Namespaces{
Include: []string{ Include: []string{
"default", "default",
}, },
}, },
expectedPodsEvicted: 2, expectedPodsEvicted: 2,
expectedPodsWithMetricsEvicted: 2,
}, },
{ {
name: "without priorities stop when cpu capacity is depleted", name: "without priorities stop when cpu capacity is depleted",
@@ -306,14 +373,14 @@ func TestLowNodeUtilization(t *testing.T) {
test.BuildTestNode(n3NodeName, 4000, 3000, 10, test.SetNodeUnschedulable), test.BuildTestNode(n3NodeName, 4000, 3000, 10, test.SetNodeUnschedulable),
}, },
pods: []*v1.Pod{ pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 300, n1NodeName, test.SetRSOwnerRef), test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 300, n1NodeName, test.SetRSOwnerRef), test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 300, n1NodeName, test.SetRSOwnerRef), test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 300, n1NodeName, test.SetRSOwnerRef), test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 300, n1NodeName, test.SetRSOwnerRef), test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted. // These won't be evicted.
test.BuildTestPod("p6", 400, 300, n1NodeName, test.SetDSOwnerRef), test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 300, n1NodeName, func(pod *v1.Pod) { test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with local storage. // A pod with local storage.
test.SetNormalOwnerRef(pod) test.SetNormalOwnerRef(pod)
pod.Spec.Volumes = []v1.Volume{ pod.Spec.Volumes = []v1.Volume{
@@ -330,17 +397,29 @@ func TestLowNodeUtilization(t *testing.T) {
// A Mirror Pod. // A Mirror Pod.
pod.Annotations = test.GetMirrorPodAnnotation() pod.Annotations = test.GetMirrorPodAnnotation()
}), }),
test.BuildTestPod("p8", 400, 300, n1NodeName, func(pod *v1.Pod) { test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A Critical Pod. // A Critical Pod.
test.SetNormalOwnerRef(pod) test.SetNormalOwnerRef(pod)
pod.Namespace = "kube-system" pod.Namespace = "kube-system"
priority := utils.SystemCriticalPriority priority := utils.SystemCriticalPriority
pod.Spec.Priority = &priority pod.Spec.Priority = &priority
}), }),
test.BuildTestPod("p9", 400, 2100, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
// 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before cpu is depleted nodemetricses: []*v1beta1.NodeMetrics{
expectedPodsEvicted: 3, test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 0, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
}, },
{ {
name: "with priorities", name: "with priorities",
@@ -410,7 +489,20 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4, expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
}, },
{ {
name: "without priorities evicting best-effort pods only", name: "without priorities evicting best-effort pods only",
@@ -478,7 +570,20 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 4, expectedPodsEvicted: 4,
expectedPodsWithMetricsEvicted: 4,
evictedPods: []string{"p1", "p2", "p4", "p5"}, evictedPods: []string{"p1", "p2", "p4", "p5"},
}, },
{ {
@@ -558,8 +663,21 @@ func TestLowNodeUtilization(t *testing.T) {
test.SetPodExtendedResourceRequest(pod, extendedResource, 1) test.SetPodExtendedResourceRequest(pod, extendedResource, 1)
}), }),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
// 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before extended resource is depleted // 4 pods available for eviction based on v1.ResourcePods, only 3 pods can be evicted before extended resource is depleted
expectedPodsEvicted: 3, expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 0,
}, },
{ {
name: "with extended resource in some of nodes", name: "with extended resource in some of nodes",
@@ -586,8 +704,21 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
// 0 pods available for eviction because there's no enough extended resource in node2 // 0 pods available for eviction because there's no enough extended resource in node2
expectedPodsEvicted: 0, expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
}, },
{ {
name: "without priorities, but only other node is unschedulable", name: "without priorities, but only other node is unschedulable",
@@ -636,7 +767,19 @@ func TestLowNodeUtilization(t *testing.T) {
pod.Spec.Priority = &priority pod.Spec.Priority = &priority
}), }),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 0, expectedPodsEvicted: 0,
expectedPodsWithMetricsEvicted: 0,
}, },
{ {
name: "without priorities, but only other node doesn't match pod node selector for p4 and p5", name: "without priorities, but only other node doesn't match pod node selector for p4 and p5",
@@ -701,7 +844,17 @@ func TestLowNodeUtilization(t *testing.T) {
pod.Spec.Priority = &priority pod.Spec.Priority = &priority
}), }),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
},
expectedPodsEvicted: 3, expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 3,
}, },
{ {
name: "without priorities, but only other node doesn't match pod node affinity for p4 and p5", name: "without priorities, but only other node doesn't match pod node affinity for p4 and p5",
@@ -795,7 +948,17 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
},
expectedPodsEvicted: 3, expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 3,
}, },
{ {
name: "deviation thresholds", name: "deviation thresholds",
@@ -847,13 +1010,131 @@ func TestLowNodeUtilization(t *testing.T) {
}), }),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
}, },
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
test.BuildNodeMetrics(n3NodeName, 11, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 401, 0),
test.BuildPodMetrics("p2", 401, 0),
test.BuildPodMetrics("p3", 401, 0),
test.BuildPodMetrics("p4", 401, 0),
test.BuildPodMetrics("p5", 401, 0),
},
expectedPodsEvicted: 2, expectedPodsEvicted: 2,
expectedPodsWithMetricsEvicted: 2,
evictedPods: []string{}, evictedPods: []string{},
}, },
{
name: "without priorities different evictions for requested and actual resources",
thresholds: api.ResourceThresholds{
v1.ResourceCPU: 30,
v1.ResourcePods: 30,
},
targetThresholds: api.ResourceThresholds{
v1.ResourceCPU: 50,
v1.ResourcePods: 50,
},
nodes: []*v1.Node{
test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil),
test.BuildTestNode(n2NodeName, 4000, 3000, 10, func(node *v1.Node) {
node.ObjectMeta.Labels = map[string]string{
nodeSelectorKey: notMatchingNodeSelectorValue,
}
}),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p4", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with affinity to run in the "west" datacenter upon scheduling
test.SetNormalOwnerRef(pod)
pod.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: nodeSelectorKey,
Operator: "In",
Values: []string{nodeSelectorValue},
},
},
},
},
},
},
}
}),
test.BuildTestPod("p5", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with affinity to run in the "west" datacenter upon scheduling
test.SetNormalOwnerRef(pod)
pod.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: nodeSelectorKey,
Operator: "In",
Values: []string{nodeSelectorValue},
},
},
},
},
},
},
}
}),
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A pod with local storage.
test.SetNormalOwnerRef(pod)
pod.Spec.Volumes = []v1.Volume{
{
Name: "sample",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{Path: "somePath"},
EmptyDir: &v1.EmptyDirVolumeSource{
SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI),
},
},
},
}
// A Mirror Pod.
pod.Annotations = test.GetMirrorPodAnnotation()
}),
test.BuildTestPod("p8", 400, 0, n1NodeName, func(pod *v1.Pod) {
// A Critical Pod.
test.SetNormalOwnerRef(pod)
pod.Namespace = "kube-system"
priority := utils.SystemCriticalPriority
pod.Spec.Priority = &priority
}),
test.BuildTestPod("p9", 0, 0, n2NodeName, test.SetRSOwnerRef),
},
nodemetricses: []*v1beta1.NodeMetrics{
test.BuildNodeMetrics(n1NodeName, 3201, 0),
test.BuildNodeMetrics(n2NodeName, 401, 0),
},
podmetricses: []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 801, 0),
test.BuildPodMetrics("p2", 801, 0),
test.BuildPodMetrics("p3", 801, 0),
},
expectedPodsEvicted: 3,
expectedPodsWithMetricsEvicted: 2,
},
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@@ -864,7 +1145,21 @@ func TestLowNodeUtilization(t *testing.T) {
for _, pod := range tc.pods { for _, pod := range tc.pods {
objs = append(objs, pod) objs = append(objs, pod)
} }
var metricsObjs []runtime.Object
for _, nodemetrics := range tc.nodemetricses {
metricsObjs = append(metricsObjs, nodemetrics)
}
for _, podmetrics := range tc.podmetricses {
metricsObjs = append(metricsObjs, podmetrics)
}
fakeClient := fake.NewSimpleClientset(objs...) fakeClient := fake.NewSimpleClientset(objs...)
metricsClientset := fakemetricsclient.NewSimpleClientset(metricsObjs...)
collector := metricscollector.NewMetricsCollector(fakeClient, metricsClientset)
err := collector.Collect(ctx)
if err != nil {
t.Fatalf("unable to collect metrics: %v", err)
}
podsForEviction := make(map[string]struct{}) podsForEviction := make(map[string]struct{})
for _, pod := range tc.evictedPods { for _, pod := range tc.evictedPods {
@@ -891,12 +1186,16 @@ func TestLowNodeUtilization(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Unable to initialize a framework handle: %v", err) t.Fatalf("Unable to initialize a framework handle: %v", err)
} }
handle.MetricsCollectorImpl = collector
plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{
Thresholds: tc.thresholds, Thresholds: tc.thresholds,
TargetThresholds: tc.targetThresholds, TargetThresholds: tc.targetThresholds,
UseDeviationThresholds: tc.useDeviationThresholds, UseDeviationThresholds: tc.useDeviationThresholds,
EvictableNamespaces: tc.evictableNamespaces, EvictableNamespaces: tc.evictableNamespaces,
MetricsUtilization: MetricsUtilization{
MetricsServer: metricsEnabled,
},
}, },
handle) handle)
if err != nil { if err != nil {
@@ -905,13 +1204,16 @@ func TestLowNodeUtilization(t *testing.T) {
plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes)
podsEvicted := podEvictor.TotalEvicted() podsEvicted := podEvictor.TotalEvicted()
if tc.expectedPodsEvicted != podsEvicted { if expectedPodsEvicted != podsEvicted {
t.Errorf("Expected %v pods to be evicted but %v got evicted", tc.expectedPodsEvicted, podsEvicted) t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted)
} }
if evictionFailed { if evictionFailed {
t.Errorf("Pod evictions failed unexpectedly") t.Errorf("Pod evictions failed unexpectedly")
} }
}) }
}
t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted))
t.Run(tc.name+" with metrics enabled", testFnc(true, tc.expectedPodsWithMetricsEvicted))
} }
} }
@@ -1067,3 +1369,62 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
}) })
} }
} }
func TestLowNodeUtilizationWithMetrics(t *testing.T) {
return
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
AuthToken := "eyJhbGciOiJSUzI1NiIsImtpZCI6IkNoTW9tT2w2cWtzR2V0dURZdjBqdnBSdmdWM29lWmc3dWpfNW0yaDc2NHMifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjIl0sImV4cCI6MTcyODk5MjY3NywiaWF0IjoxNzI4OTg5MDc3LCJpc3MiOiJodHRwczovL2t1YmVybmV0ZXMuZGVmYXVsdC5zdmMiLCJqdGkiOiJkNDY3ZjVmMy0xNGVmLTRkMjItOWJkNC1jMGM1Mzk3NzYyZDgiLCJrdWJlcm5ldGVzLmlvIjp7Im5hbWVzcGFjZSI6Im9wZW5zaGlmdC1tb25pdG9yaW5nIiwic2VydmljZWFjY291bnQiOnsibmFtZSI6InByb21ldGhldXMtazhzIiwidWlkIjoiNjY4NDllMGItYTAwZC00NjUzLWE5NTItNThiYTE1MTk4NTlkIn19LCJuYmYiOjE3Mjg5ODkwNzcsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpvcGVuc2hpZnQtbW9uaXRvcmluZzpwcm9tZXRoZXVzLWs4cyJ9.J1i6-oRAC9J8mqrlZPKGA-CU5PbUzhm2QxAWFnu65-NXR3e252mesybwtjkwxUtTLKrsYHQXwEsG5rGcQsvMcGK9RC9y5z33DFj8tPPwOGLJYJ-s5cTImTqKtWRXzTlcrsrUYTYApfrOsEyXwyfDow4PCslZjR3cd5FMRbvXNqHLg26nG_smApR4wc6kXy7xxlRuGhxu-dUiscQP56njboOK61JdTG8F3FgOayZnKk1jGeVdIhXClqGWJyokk-ZM3mMK1MxzGXY0tLbe37V4B7g3NDiH651BUcicfDSky46yfcAYxMDbZgpK2TByWApAllN0wixz2WsFfyBVu_Q5xtZ9Gi9BUHSa5ioRiBK346W4Bdmr9ala5ldIXDa59YE7UB34DsCHyqvzRx_Sj76hLzy2jSOk7RsL0fM8sDoJL4ROdi-3Jtr5uPY593I8H8qeQvFS6PQfm0bUZqVKrrLoCK_uk9guH4a6K27SlD-Utk3dpsjbmrwcjBxm-zd_LE9YyQ734My00Pcy9D5eNio3gESjGsHqGFc_haq4ZCiVOCkbdmABjpPEL6K7bs1GMZbHt1CONL0-LzymM8vgGNj0grjpG8-5AF8ZuSqR7pbZSV_NO2nKkmrwpILCw0Joqp6V3C9pP9nXWHIDyVMxMK870zxzt_qCoPRLCAujQQn6e0U"
client, err := promapi.NewClient(promapi.Config{
Address: "https://prometheus-k8s-openshift-monitoring.apps.jchaloup-20241015-3.group-b.devcluster.openshift.com",
RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(AuthToken), roundTripper),
})
if err != nil {
t.Fatalf("prom client error: %v", err)
}
// pod:container_cpu_usage:sum
// container_memory_usage_bytes
v1api := promv1.NewAPI(client)
ctx := context.TODO()
// promQuery := "avg_over_time(kube_pod_container_resource_requests[1m])"
promQuery := "kube_pod_container_resource_requests"
results, warnings, err := v1api.Query(ctx, promQuery, time.Now())
fmt.Printf("results: %#v\n", results)
for _, sample := range results.(model.Vector) {
fmt.Printf("sample: %#v\n", sample)
}
fmt.Printf("warnings: %v\n", warnings)
fmt.Printf("err: %v\n", err)
result := model.Value(
&model.Vector{
&model.Sample{
Metric: model.Metric{
"container": "kube-controller-manager",
"endpoint": "https-main",
"job": "kube-state-metrics",
"namespace": "openshift-kube-controller-manager",
"node": "ip-10-0-72-168.us-east-2.compute.internal",
"pod": "kube-controller-manager-ip-10-0-72-168.us-east-2.compute.internal",
"resource": "cpu",
"service": "kube-state-metrics",
"uid": "ae46c09f-ade7-4133-9ee8-cf45ac78ca6d",
"unit": "core",
},
Value: 0.06,
Timestamp: 1728991761711,
},
},
)
fmt.Printf("result: %#v\n", result)
}

View File

@@ -78,14 +78,14 @@ func getNodeThresholds(
nodes []*v1.Node, nodes []*v1.Node,
lowThreshold, highThreshold api.ResourceThresholds, lowThreshold, highThreshold api.ResourceThresholds,
resourceNames []v1.ResourceName, resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
useDeviationThresholds bool, useDeviationThresholds bool,
usageClient usageClient,
) map[string]NodeThresholds { ) map[string]NodeThresholds {
nodeThresholdsMap := map[string]NodeThresholds{} nodeThresholdsMap := map[string]NodeThresholds{}
averageResourceUsagePercent := api.ResourceThresholds{} averageResourceUsagePercent := api.ResourceThresholds{}
if useDeviationThresholds { if useDeviationThresholds {
averageResourceUsagePercent = averageNodeBasicresources(nodes, getPodsAssignedToNode, resourceNames) averageResourceUsagePercent = averageNodeBasicresources(nodes, usageClient)
} }
for _, node := range nodes { for _, node := range nodes {
@@ -121,22 +121,15 @@ func getNodeThresholds(
func getNodeUsage( func getNodeUsage(
nodes []*v1.Node, nodes []*v1.Node,
resourceNames []v1.ResourceName, usageClient usageClient,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
) []NodeUsage { ) []NodeUsage {
var nodeUsageList []NodeUsage var nodeUsageList []NodeUsage
for _, node := range nodes { for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}
nodeUsageList = append(nodeUsageList, NodeUsage{ nodeUsageList = append(nodeUsageList, NodeUsage{
node: node, node: node,
usage: nodeutil.NodeUtilization(pods, resourceNames), usage: usageClient.nodeUtilization(node.Name),
allPods: pods, allPods: usageClient.pods(node.Name),
}) })
} }
@@ -226,6 +219,7 @@ func evictPodsFromSourceNodes(
podFilter func(pod *v1.Pod) bool, podFilter func(pod *v1.Pod) bool,
resourceNames []v1.ResourceName, resourceNames []v1.ResourceName,
continueEviction continueEvictionCond, continueEviction continueEvictionCond,
usageSnapshot usageClient,
) { ) {
// upper bound on total number of pods/cpu/memory and optional extended resources to be moved // upper bound on total number of pods/cpu/memory and optional extended resources to be moved
totalAvailableUsage := map[v1.ResourceName]*resource.Quantity{ totalAvailableUsage := map[v1.ResourceName]*resource.Quantity{
@@ -243,6 +237,10 @@ func evictPodsFromSourceNodes(
totalAvailableUsage[name] = resource.NewQuantity(0, resource.DecimalSI) totalAvailableUsage[name] = resource.NewQuantity(0, resource.DecimalSI)
} }
totalAvailableUsage[name].Add(*node.thresholds.highResourceThreshold[name]) totalAvailableUsage[name].Add(*node.thresholds.highResourceThreshold[name])
if _, exists := node.usage[name]; !exists {
klog.Errorf("unable to find %q resource in node's %q usage, terminating eviction", name, node.node.Name)
return
}
totalAvailableUsage[name].Sub(*node.usage[name]) totalAvailableUsage[name].Sub(*node.usage[name])
} }
} }
@@ -274,7 +272,7 @@ func evictPodsFromSourceNodes(
klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers") klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers")
// sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers. // sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers.
podutil.SortPodsBasedOnPriorityLowToHigh(removablePods) podutil.SortPodsBasedOnPriorityLowToHigh(removablePods)
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction) err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction, usageSnapshot)
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case *evictions.EvictionTotalLimitError: case *evictions.EvictionTotalLimitError:
@@ -295,6 +293,7 @@ func evictPods(
podEvictor frameworktypes.Evictor, podEvictor frameworktypes.Evictor,
evictOptions evictions.EvictOptions, evictOptions evictions.EvictOptions,
continueEviction continueEvictionCond, continueEviction continueEvictionCond,
usageSnapshot usageClient,
) error { ) error {
var excludedNamespaces sets.Set[string] var excludedNamespaces sets.Set[string]
if evictableNamespaces != nil { if evictableNamespaces != nil {
@@ -320,18 +319,21 @@ func evictPods(
if !preEvictionFilterWithOptions(pod) { if !preEvictionFilterWithOptions(pod) {
continue continue
} }
podUsage, err := usageSnapshot.podUsage(pod)
if err != nil {
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
continue
}
err = podEvictor.Evict(ctx, pod, evictOptions) err = podEvictor.Evict(ctx, pod, evictOptions)
if err == nil { if err == nil {
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod)) klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))
for name := range totalAvailableUsage { for name := range totalAvailableUsage {
if name == v1.ResourcePods { if name == v1.ResourcePods {
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
} else { } else {
quantity := utils.GetResourceRequestQuantity(pod, name) nodeInfo.usage[name].Sub(*podUsage[name])
nodeInfo.usage[name].Sub(quantity) totalAvailableUsage[name].Sub(*podUsage[name])
totalAvailableUsage[name].Sub(quantity)
} }
} }
@@ -437,17 +439,12 @@ func classifyPods(pods []*v1.Pod, filter func(pod *v1.Pod) bool) ([]*v1.Pod, []*
return nonRemovablePods, removablePods return nonRemovablePods, removablePods
} }
func averageNodeBasicresources(nodes []*v1.Node, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, resourceNames []v1.ResourceName) api.ResourceThresholds { func averageNodeBasicresources(nodes []*v1.Node, usageClient usageClient) api.ResourceThresholds {
total := api.ResourceThresholds{} total := api.ResourceThresholds{}
average := api.ResourceThresholds{} average := api.ResourceThresholds{}
numberOfNodes := len(nodes) numberOfNodes := len(nodes)
for _, node := range nodes { for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, getPodsAssignedToNode, nil) usage := usageClient.nodeUtilization(node.Name)
if err != nil {
numberOfNodes--
continue
}
usage := nodeutil.NodeUtilization(pods, resourceNames)
nodeCapacity := node.Status.Capacity nodeCapacity := node.Status.Capacity
if len(node.Status.Allocatable) > 0 { if len(node.Status.Allocatable) > 0 {
nodeCapacity = node.Status.Allocatable nodeCapacity = node.Status.Allocatable

View File

@@ -28,6 +28,7 @@ type LowNodeUtilizationArgs struct {
Thresholds api.ResourceThresholds `json:"thresholds"` Thresholds api.ResourceThresholds `json:"thresholds"`
TargetThresholds api.ResourceThresholds `json:"targetThresholds"` TargetThresholds api.ResourceThresholds `json:"targetThresholds"`
NumberOfNodes int `json:"numberOfNodes,omitempty"` NumberOfNodes int `json:"numberOfNodes,omitempty"`
MetricsUtilization MetricsUtilization `json:metricsUtilization,omitempty`
// Naming this one differently since namespaces are still // Naming this one differently since namespaces are still
// considered while considering resources used by pods // considered while considering resources used by pods
@@ -43,8 +44,17 @@ type HighNodeUtilizationArgs struct {
Thresholds api.ResourceThresholds `json:"thresholds"` Thresholds api.ResourceThresholds `json:"thresholds"`
NumberOfNodes int `json:"numberOfNodes,omitempty"` NumberOfNodes int `json:"numberOfNodes,omitempty"`
MetricsUtilization MetricsUtilization `json:metricsUtilization,omitempty`
// Naming this one differently since namespaces are still // Naming this one differently since namespaces are still
// considered while considering resources used by pods // considered while considering resources used by pods
// but then filtered out before eviction // but then filtered out before eviction
EvictableNamespaces *api.Namespaces `json:"evictableNamespaces,omitempty"` EvictableNamespaces *api.Namespaces `json:"evictableNamespaces,omitempty"`
} }
// MetricsUtilization allow to consume actual resource utilization from metrics
type MetricsUtilization struct {
// metricsServer enables metrics from a kubernetes metrics server.
// Please see https://kubernetes-sigs.github.io/metrics-server/ for more.
MetricsServer bool `json:"metricsServer,omitempty"`
}

View File

@@ -0,0 +1,202 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeutilization
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
utilptr "k8s.io/utils/ptr"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/utils"
)
type usageClient interface {
nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity
nodes() []*v1.Node
pods(node string) []*v1.Pod
capture(nodes []*v1.Node) error
podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error)
}
type requestedUsageClient struct {
resourceNames []v1.ResourceName
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
_nodes []*v1.Node
_pods map[string][]*v1.Pod
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
}
var _ usageClient = &requestedUsageClient{}
func newRequestedUsageSnapshot(
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
) *requestedUsageClient {
return &requestedUsageClient{
resourceNames: resourceNames,
getPodsAssignedToNode: getPodsAssignedToNode,
}
}
func (s *requestedUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
return s._nodeUtilization[node]
}
func (s *requestedUsageClient) nodes() []*v1.Node {
return s._nodes
}
func (s *requestedUsageClient) pods(node string) []*v1.Pod {
return s._pods[node]
}
func (s *requestedUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
usage := make(map[v1.ResourceName]*resource.Quantity)
for _, resourceName := range s.resourceNames {
usage[resourceName] = utilptr.To[resource.Quantity](utils.GetResourceRequestQuantity(pod, resourceName).DeepCopy())
}
return usage, nil
}
func (s *requestedUsageClient) capture(nodes []*v1.Node) error {
s._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
s._pods = make(map[string][]*v1.Pod)
capturedNodes := []*v1.Node{}
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}
nodeUsage, err := nodeutil.NodeUtilization(pods, s.resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
})
if err != nil {
return err
}
// store the snapshot of pods from the same (or the closest) node utilization computation
s._pods[node.Name] = pods
s._nodeUtilization[node.Name] = nodeUsage
capturedNodes = append(capturedNodes, node)
}
s._nodes = capturedNodes
return nil
}
type actualUsageClient struct {
resourceNames []v1.ResourceName
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
metricsCollector *metricscollector.MetricsCollector
_nodes []*v1.Node
_pods map[string][]*v1.Pod
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
}
var _ usageClient = &actualUsageClient{}
func newActualUsageSnapshot(
resourceNames []v1.ResourceName,
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
metricsCollector *metricscollector.MetricsCollector,
) *actualUsageClient {
return &actualUsageClient{
resourceNames: resourceNames,
getPodsAssignedToNode: getPodsAssignedToNode,
metricsCollector: metricsCollector,
}
}
func (client *actualUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
return client._nodeUtilization[node]
}
func (client *actualUsageClient) nodes() []*v1.Node {
return client._nodes
}
func (client *actualUsageClient) pods(node string) []*v1.Pod {
return client._pods[node]
}
func (client *actualUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
// It's not efficient to keep track of all pods in a cluster when only their fractions is evicted.
// Thus, take the current pod metrics without computing any softening (like e.g. EWMA).
podMetrics, err := client.metricsCollector.MetricsClient().MetricsV1beta1().PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("unable to get podmetrics for %q/%q: %v", pod.Namespace, pod.Name, err)
}
totalUsage := make(map[v1.ResourceName]*resource.Quantity)
for _, container := range podMetrics.Containers {
for _, resourceName := range client.resourceNames {
if _, exists := container.Usage[resourceName]; !exists {
continue
}
if totalUsage[resourceName] == nil {
totalUsage[resourceName] = utilptr.To[resource.Quantity](container.Usage[resourceName].DeepCopy())
} else {
totalUsage[resourceName].Add(container.Usage[resourceName])
}
}
}
return totalUsage, nil
}
func (client *actualUsageClient) capture(nodes []*v1.Node) error {
client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
client._pods = make(map[string][]*v1.Pod)
capturedNodes := []*v1.Node{}
for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}
nodeUsage, err := client.metricsCollector.NodeUsage(node)
if err != nil {
return err
}
nodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI)
// store the snapshot of pods from the same (or the closest) node utilization computation
client._pods[node.Name] = pods
client._nodeUtilization[node.Name] = nodeUsage
capturedNodes = append(capturedNodes, node)
}
client._nodes = capturedNodes
return nil
}

View File

@@ -0,0 +1,135 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeutilization
import (
"context"
"fmt"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/test"
)
var gvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodemetricses"}
func updateMetricsAndCheckNodeUtilization(
t *testing.T,
ctx context.Context,
newValue, expectedValue int64,
metricsClientset *fakemetricsclient.Clientset,
collector *metricscollector.MetricsCollector,
usageSnapshot usageClient,
nodes []*v1.Node,
nodeName string,
nodemetrics *v1beta1.NodeMetrics,
) {
t.Logf("Set current node cpu usage to %v", newValue)
nodemetrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(newValue, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, nodemetrics, "")
err := collector.Collect(ctx)
if err != nil {
t.Fatalf("failed to capture metrics: %v", err)
}
err = usageSnapshot.capture(nodes)
if err != nil {
t.Fatalf("failed to capture a snapshot: %v", err)
}
nodeUtilization := usageSnapshot.nodeUtilization(nodeName)
t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue())
if nodeUtilization[v1.ResourceCPU].MilliValue() != expectedValue {
t.Fatalf("cpu node usage expected to be %v, got %v instead", expectedValue, nodeUtilization[v1.ResourceCPU].MilliValue())
}
pods := usageSnapshot.pods(nodeName)
fmt.Printf("pods: %#v\n", pods)
if len(pods) != 2 {
t.Fatalf("expected 2 pods for node %v, got %v instead", nodeName, len(pods))
}
capturedNodes := usageSnapshot.nodes()
if len(capturedNodes) != 3 {
t.Fatalf("expected 3 captured node, got %v instead", len(capturedNodes))
}
}
func TestActualUsageClient(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)
p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil)
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)
nodes := []*v1.Node{n1, n2, n3}
n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)
metricsClientset := fakemetricsclient.NewSimpleClientset(n1metrics, n2metrics, n3metrics)
ctx := context.TODO()
resourceNames := []v1.ResourceName{
v1.ResourceCPU,
v1.ResourceMemory,
}
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
collector := metricscollector.NewMetricsCollector(clientset, metricsClientset)
usageSnapshot := newActualUsageSnapshot(
resourceNames,
podsAssignedToNode,
collector,
)
updateMetricsAndCheckNodeUtilization(t, ctx,
1400, 1400,
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
)
updateMetricsAndCheckNodeUtilization(t, ctx,
500, 1310,
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
)
updateMetricsAndCheckNodeUtilization(t, ctx,
900, 1269,
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
)
}

View File

@@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@@ -67,6 +68,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev
// handleImpl implements the framework handle which gets passed to plugins // handleImpl implements the framework handle which gets passed to plugins
type handleImpl struct { type handleImpl struct {
clientSet clientset.Interface clientSet clientset.Interface
metricsCollector *metricscollector.MetricsCollector
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory sharedInformerFactory informers.SharedInformerFactory
evictor *evictorImpl evictor *evictorImpl
@@ -79,6 +81,10 @@ func (hi *handleImpl) ClientSet() clientset.Interface {
return hi.clientSet return hi.clientSet
} }
func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector {
return hi.metricsCollector
}
// GetPodsAssignedToNodeFunc retrieves GetPodsAssignedToNodeFunc implementation // GetPodsAssignedToNodeFunc retrieves GetPodsAssignedToNodeFunc implementation
func (hi *handleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc { func (hi *handleImpl) GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc {
return hi.getPodsAssignedToNodeFunc return hi.getPodsAssignedToNodeFunc
@@ -128,6 +134,7 @@ type handleImplOpts struct {
sharedInformerFactory informers.SharedInformerFactory sharedInformerFactory informers.SharedInformerFactory
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
podEvictor *evictions.PodEvictor podEvictor *evictions.PodEvictor
metricsCollector *metricscollector.MetricsCollector
} }
// WithClientSet sets clientSet for the scheduling frameworkImpl. // WithClientSet sets clientSet for the scheduling frameworkImpl.
@@ -155,6 +162,12 @@ func WithGetPodsAssignedToNodeFnc(getPodsAssignedToNodeFunc podutil.GetPodsAssig
} }
} }
func WithMetricsCollector(metricsCollector *metricscollector.MetricsCollector) Option {
return func(o *handleImplOpts) {
o.metricsCollector = metricsCollector
}
}
func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) { func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) {
for idx, pluginConfig := range pluginConfigs { for idx, pluginConfig := range pluginConfigs {
if pluginConfig.Name == pluginName { if pluginConfig.Name == pluginName {
@@ -253,6 +266,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts
profileName: config.Name, profileName: config.Name,
podEvictor: hOpts.podEvictor, podEvictor: hOpts.podEvictor,
}, },
metricsCollector: hOpts.metricsCollector,
} }
pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...) pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...)

View File

@@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/evictions"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
@@ -36,6 +37,7 @@ type Handle interface {
Evictor() Evictor Evictor() Evictor
GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc
SharedInformerFactory() informers.SharedInformerFactory SharedInformerFactory() informers.SharedInformerFactory
MetricsCollector() *metricscollector.MetricsCollector
} }
// Evictor defines an interface for filtering and evicting pods // Evictor defines an interface for filtering and evicting pods

View File

@@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
utilptr "k8s.io/utils/ptr" utilptr "k8s.io/utils/ptr"
) )
@@ -67,6 +68,26 @@ func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v
return pod return pod
} }
// BuildPodMetrics creates a test podmetrics with given parameters.
func BuildPodMetrics(name string, millicpu, mem int64) *v1beta1.PodMetrics {
return &v1beta1.PodMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
},
Window: metav1.Duration{Duration: 20010000000},
Containers: []v1beta1.ContainerMetrics{
{
Name: "container-1",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI),
},
},
},
}
}
// GetMirrorPodAnnotation returns the annotation needed for mirror pod. // GetMirrorPodAnnotation returns the annotation needed for mirror pod.
func GetMirrorPodAnnotation() map[string]string { func GetMirrorPodAnnotation() map[string]string {
return map[string]string{ return map[string]string{
@@ -135,6 +156,19 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node))
return node return node
} }
func BuildNodeMetrics(name string, millicpu, mem int64) *v1beta1.NodeMetrics {
return &v1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Window: metav1.Duration{Duration: 20010000000},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(millicpu, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI),
},
}
}
// MakeBestEffortPod makes the given pod a BestEffort pod // MakeBestEffortPod makes the given pod a BestEffort pod
func MakeBestEffortPod(pod *v1.Pod) { func MakeBestEffortPod(pod *v1.Pod) {
pod.Spec.Containers[0].Resources.Requests = nil pod.Spec.Containers[0].Resources.Requests = nil

View File

@@ -0,0 +1,77 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
)
// FakeNodeMetricses implements NodeMetricsInterface
type FakeNodeMetricses struct {
Fake *FakeMetricsV1beta1
}
var nodemetricsesResource = v1beta1.SchemeGroupVersion.WithResource("nodemetricses")
var nodemetricsesKind = v1beta1.SchemeGroupVersion.WithKind("NodeMetrics")
// Get takes name of the nodeMetrics, and returns the corresponding nodeMetrics object, and an error if there is any.
func (c *FakeNodeMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.NodeMetrics, err error) {
emptyResult := &v1beta1.NodeMetrics{}
obj, err := c.Fake.
Invokes(testing.NewRootGetActionWithOptions(nodemetricsesResource, name, options), emptyResult)
if obj == nil {
return emptyResult, err
}
return obj.(*v1beta1.NodeMetrics), err
}
// List takes label and field selectors, and returns the list of NodeMetricses that match those selectors.
func (c *FakeNodeMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.NodeMetricsList, err error) {
emptyResult := &v1beta1.NodeMetricsList{}
obj, err := c.Fake.
Invokes(testing.NewRootListActionWithOptions(nodemetricsesResource, nodemetricsesKind, opts), emptyResult)
if obj == nil {
return emptyResult, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &v1beta1.NodeMetricsList{ListMeta: obj.(*v1beta1.NodeMetricsList).ListMeta}
for _, item := range obj.(*v1beta1.NodeMetricsList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested nodeMetricses.
func (c *FakeNodeMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewRootWatchActionWithOptions(nodemetricsesResource, opts))
}

View File

@@ -0,0 +1,81 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
)
// FakePodMetricses implements PodMetricsInterface
type FakePodMetricses struct {
Fake *FakeMetricsV1beta1
ns string
}
var podmetricsesResource = v1beta1.SchemeGroupVersion.WithResource("podmetricses")
var podmetricsesKind = v1beta1.SchemeGroupVersion.WithKind("PodMetrics")
// Get takes name of the podMetrics, and returns the corresponding podMetrics object, and an error if there is any.
func (c *FakePodMetricses) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1beta1.PodMetrics, err error) {
emptyResult := &v1beta1.PodMetrics{}
obj, err := c.Fake.
Invokes(testing.NewGetActionWithOptions(podmetricsesResource, c.ns, name, options), emptyResult)
if obj == nil {
return emptyResult, err
}
return obj.(*v1beta1.PodMetrics), err
}
// List takes label and field selectors, and returns the list of PodMetricses that match those selectors.
func (c *FakePodMetricses) List(ctx context.Context, opts v1.ListOptions) (result *v1beta1.PodMetricsList, err error) {
emptyResult := &v1beta1.PodMetricsList{}
obj, err := c.Fake.
Invokes(testing.NewListActionWithOptions(podmetricsesResource, podmetricsesKind, c.ns, opts), emptyResult)
if obj == nil {
return emptyResult, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &v1beta1.PodMetricsList{ListMeta: obj.(*v1beta1.PodMetricsList).ListMeta}
for _, item := range obj.(*v1beta1.PodMetricsList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested podMetricses.
func (c *FakePodMetricses) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewWatchActionWithOptions(podmetricsesResource, c.ns, opts))
}