mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 05:14:13 +01:00
483 lines
18 KiB
Go
483 lines
18 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package descheduler
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"strconv"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/tools/events"
|
|
componentbaseconfig "k8s.io/component-base/config"
|
|
"k8s.io/klog/v2"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
policy "k8s.io/api/policy/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
utilversion "k8s.io/apimachinery/pkg/util/version"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
|
listersv1 "k8s.io/client-go/listers/core/v1"
|
|
schedulingv1 "k8s.io/client-go/listers/scheduling/v1"
|
|
core "k8s.io/client-go/testing"
|
|
|
|
"sigs.k8s.io/descheduler/pkg/descheduler/client"
|
|
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
|
|
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
|
|
"sigs.k8s.io/descheduler/pkg/tracing"
|
|
"sigs.k8s.io/descheduler/pkg/utils"
|
|
"sigs.k8s.io/descheduler/pkg/version"
|
|
|
|
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
|
|
"sigs.k8s.io/descheduler/metrics"
|
|
"sigs.k8s.io/descheduler/pkg/api"
|
|
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
|
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
|
|
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
|
|
frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile"
|
|
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
|
|
)
|
|
|
|
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
|
|
|
|
type profileRunner struct {
|
|
name string
|
|
descheduleEPs, balanceEPs eprunner
|
|
}
|
|
|
|
type descheduler struct {
|
|
rs *options.DeschedulerServer
|
|
podLister listersv1.PodLister
|
|
nodeLister listersv1.NodeLister
|
|
namespaceLister listersv1.NamespaceLister
|
|
priorityClassLister schedulingv1.PriorityClassLister
|
|
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
|
|
sharedInformerFactory informers.SharedInformerFactory
|
|
deschedulerPolicy *api.DeschedulerPolicy
|
|
eventRecorder events.EventRecorder
|
|
podEvictor *evictions.PodEvictor
|
|
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
|
|
}
|
|
|
|
func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
|
|
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
|
|
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
|
|
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
|
|
namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister()
|
|
priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister()
|
|
|
|
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("build get pods assigned to node function error: %v", err)
|
|
}
|
|
|
|
podEvictor := evictions.NewPodEvictor(
|
|
nil,
|
|
eventRecorder,
|
|
evictions.NewOptions().
|
|
WithPolicyGroupVersion(evictionPolicyGroupVersion).
|
|
WithMaxPodsToEvictPerNode(deschedulerPolicy.MaxNoOfPodsToEvictPerNode).
|
|
WithMaxPodsToEvictPerNamespace(deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace).
|
|
WithMaxPodsToEvictTotal(deschedulerPolicy.MaxNoOfPodsToEvictTotal).
|
|
WithDryRun(rs.DryRun).
|
|
WithMetricsEnabled(!rs.DisableMetrics),
|
|
)
|
|
|
|
return &descheduler{
|
|
rs: rs,
|
|
podLister: podLister,
|
|
nodeLister: nodeLister,
|
|
namespaceLister: namespaceLister,
|
|
priorityClassLister: priorityClassLister,
|
|
getPodsAssignedToNode: getPodsAssignedToNode,
|
|
sharedInformerFactory: sharedInformerFactory,
|
|
deschedulerPolicy: deschedulerPolicy,
|
|
eventRecorder: eventRecorder,
|
|
podEvictor: podEvictor,
|
|
podEvictionReactionFnc: podEvictionReactionFnc,
|
|
}, nil
|
|
}
|
|
|
|
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
|
|
var span trace.Span
|
|
ctx, span = tracing.Tracer().Start(ctx, "runDeschedulerLoop")
|
|
defer span.End()
|
|
defer func(loopStartDuration time.Time) {
|
|
metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
|
|
}(time.Now())
|
|
|
|
// if len is still <= 1 error out
|
|
if len(nodes) <= 1 {
|
|
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
|
|
return fmt.Errorf("the cluster size is 0 or 1")
|
|
}
|
|
|
|
var client clientset.Interface
|
|
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
|
|
// So when evicting pods while running multiple strategies in a row have the cummulative effect
|
|
// as is when evicting pods for real.
|
|
if d.rs.DryRun {
|
|
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
|
|
// Create a new cache so we start from scratch without any leftovers
|
|
fakeClient := fakeclientset.NewSimpleClientset()
|
|
// simulate a pod eviction by deleting a pod
|
|
fakeClient.PrependReactor("create", "pods", d.podEvictionReactionFnc(fakeClient))
|
|
err := cachedClient(d.rs.Client, fakeClient, d.podLister, d.nodeLister, d.namespaceLister, d.priorityClassLister)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// create a new instance of the shared informer factor from the cached client
|
|
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
|
// register the pod informer, otherwise it will not get running
|
|
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
|
|
if err != nil {
|
|
return fmt.Errorf("build get pods assigned to node function error: %v", err)
|
|
}
|
|
|
|
fakeCtx, cncl := context.WithCancel(context.TODO())
|
|
defer cncl()
|
|
fakeSharedInformerFactory.Start(fakeCtx.Done())
|
|
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
|
|
|
|
client = fakeClient
|
|
d.sharedInformerFactory = fakeSharedInformerFactory
|
|
} else {
|
|
client = d.rs.Client
|
|
}
|
|
|
|
klog.V(3).Infof("Setting up the pod evictor")
|
|
d.podEvictor.SetClient(client)
|
|
d.podEvictor.ResetCounters()
|
|
|
|
d.runProfiles(ctx, client, nodes)
|
|
|
|
klog.V(1).InfoS("Number of evicted pods", "totalEvicted", d.podEvictor.TotalEvicted())
|
|
|
|
return nil
|
|
}
|
|
|
|
// runProfiles runs all the deschedule plugins of all profiles and
|
|
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
|
|
// see https://github.com/kubernetes-sigs/descheduler/issues/979
|
|
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node) {
|
|
var span trace.Span
|
|
ctx, span = tracing.Tracer().Start(ctx, "runProfiles")
|
|
defer span.End()
|
|
var profileRunners []profileRunner
|
|
for _, profile := range d.deschedulerPolicy.Profiles {
|
|
currProfile, err := frameworkprofile.NewProfile(
|
|
profile,
|
|
pluginregistry.PluginRegistry,
|
|
frameworkprofile.WithClientSet(client),
|
|
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
|
|
frameworkprofile.WithPodEvictor(d.podEvictor),
|
|
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
|
|
)
|
|
if err != nil {
|
|
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
|
|
continue
|
|
}
|
|
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
|
|
}
|
|
|
|
for _, profileR := range profileRunners {
|
|
// First deschedule
|
|
status := profileR.descheduleEPs(ctx, nodes)
|
|
if status != nil && status.Err != nil {
|
|
span.AddEvent("failed to perform deschedule operations", trace.WithAttributes(attribute.String("err", status.Err.Error()), attribute.String("profile", profileR.name), attribute.String("operation", tracing.DescheduleOperation)))
|
|
klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profileR.name)
|
|
continue
|
|
}
|
|
}
|
|
|
|
for _, profileR := range profileRunners {
|
|
// Balance Later
|
|
status := profileR.balanceEPs(ctx, nodes)
|
|
if status != nil && status.Err != nil {
|
|
span.AddEvent("failed to perform balance operations", trace.WithAttributes(attribute.String("err", status.Err.Error()), attribute.String("profile", profileR.name), attribute.String("operation", tracing.BalanceOperation)))
|
|
klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profileR.name)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
|
|
var span trace.Span
|
|
ctx, span = tracing.Tracer().Start(ctx, "Run")
|
|
defer span.End()
|
|
metrics.Register()
|
|
|
|
clientConnection := rs.ClientConnection
|
|
if rs.KubeconfigFile != "" && clientConnection.Kubeconfig == "" {
|
|
clientConnection.Kubeconfig = rs.KubeconfigFile
|
|
}
|
|
rsclient, eventClient, err := createClients(clientConnection)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rs.Client = rsclient
|
|
rs.EventClient = eventClient
|
|
|
|
deschedulerPolicy, err := LoadPolicyConfig(rs.PolicyConfigFile, rs.Client, pluginregistry.PluginRegistry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if deschedulerPolicy == nil {
|
|
return fmt.Errorf("deschedulerPolicy is nil")
|
|
}
|
|
|
|
// Add k8s compatibility warnings to logs
|
|
if err := validateVersionCompatibility(rs.Client.Discovery(), version.Get()); err != nil {
|
|
klog.Warning(err.Error())
|
|
}
|
|
|
|
evictionPolicyGroupVersion, err := eutils.SupportEviction(rs.Client)
|
|
if err != nil || len(evictionPolicyGroupVersion) == 0 {
|
|
return err
|
|
}
|
|
|
|
runFn := func() error {
|
|
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion)
|
|
}
|
|
|
|
if rs.LeaderElection.LeaderElect && rs.DeschedulingInterval.Seconds() == 0 {
|
|
span.AddEvent("Validation Failure", trace.WithAttributes(attribute.String("err", "leaderElection must be used with deschedulingInterval")))
|
|
return fmt.Errorf("leaderElection must be used with deschedulingInterval")
|
|
}
|
|
|
|
if rs.LeaderElection.LeaderElect && rs.DryRun {
|
|
klog.V(1).Info("Warning: DryRun is set to True. You need to disable it to use Leader Election.")
|
|
}
|
|
|
|
if rs.LeaderElection.LeaderElect && !rs.DryRun {
|
|
if err := NewLeaderElection(runFn, rsclient, &rs.LeaderElection, ctx); err != nil {
|
|
span.AddEvent("Leader Election Failure", trace.WithAttributes(attribute.String("err", err.Error())))
|
|
return fmt.Errorf("leaderElection: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return runFn()
|
|
}
|
|
|
|
func validateVersionCompatibility(discovery discovery.DiscoveryInterface, deschedulerVersionInfo version.Info) error {
|
|
kubeServerVersionInfo, err := discovery.ServerVersion()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to discover Kubernetes server version: %v", err)
|
|
}
|
|
|
|
kubeServerVersion, err := utilversion.ParseSemantic(kubeServerVersionInfo.String())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse Kubernetes server version '%s': %v", kubeServerVersionInfo.String(), err)
|
|
}
|
|
|
|
deschedulerMinor, err := strconv.ParseFloat(deschedulerVersionInfo.Minor, 64)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to convert Descheduler minor version '%s' to float: %v", deschedulerVersionInfo.Minor, err)
|
|
}
|
|
|
|
kubeServerMinor := float64(kubeServerVersion.Minor())
|
|
if math.Abs(deschedulerMinor-kubeServerMinor) > 3 {
|
|
return fmt.Errorf(
|
|
"descheduler version %s.%s may not be supported on your version of Kubernetes %v."+
|
|
"See compatibility docs for more info: https://github.com/kubernetes-sigs/descheduler#compatibility-matrix",
|
|
deschedulerVersionInfo.Major,
|
|
deschedulerVersionInfo.Minor,
|
|
kubeServerVersionInfo.String(),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func podEvictionReactionFnc(fakeClient *fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) {
|
|
return func(action core.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() == "eviction" {
|
|
createAct, matched := action.(core.CreateActionImpl)
|
|
if !matched {
|
|
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
|
|
}
|
|
eviction, matched := createAct.Object.(*policy.Eviction)
|
|
if !matched {
|
|
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
|
|
}
|
|
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
|
|
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
|
|
}
|
|
return true, nil, nil
|
|
}
|
|
// fallback to the default reactor
|
|
return false, nil, nil
|
|
}
|
|
}
|
|
|
|
func cachedClient(
|
|
realClient clientset.Interface,
|
|
fakeClient *fakeclientset.Clientset,
|
|
podLister listersv1.PodLister,
|
|
nodeLister listersv1.NodeLister,
|
|
namespaceLister listersv1.NamespaceLister,
|
|
priorityClassLister schedulingv1.PriorityClassLister,
|
|
) error {
|
|
klog.V(3).Infof("Pulling resources for the cached client from the cluster")
|
|
pods, err := podLister.List(labels.Everything())
|
|
if err != nil {
|
|
return fmt.Errorf("unable to list pods: %v", err)
|
|
}
|
|
|
|
for _, item := range pods {
|
|
if _, err := fakeClient.CoreV1().Pods(item.Namespace).Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
|
|
return fmt.Errorf("unable to copy pod: %v", err)
|
|
}
|
|
}
|
|
|
|
nodes, err := nodeLister.List(labels.Everything())
|
|
if err != nil {
|
|
return fmt.Errorf("unable to list nodes: %v", err)
|
|
}
|
|
|
|
for _, item := range nodes {
|
|
if _, err := fakeClient.CoreV1().Nodes().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
|
|
return fmt.Errorf("unable to copy node: %v", err)
|
|
}
|
|
}
|
|
|
|
namespaces, err := namespaceLister.List(labels.Everything())
|
|
if err != nil {
|
|
return fmt.Errorf("unable to list namespaces: %v", err)
|
|
}
|
|
|
|
for _, item := range namespaces {
|
|
if _, err := fakeClient.CoreV1().Namespaces().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
|
|
return fmt.Errorf("unable to copy namespace: %v", err)
|
|
}
|
|
}
|
|
|
|
priorityClasses, err := priorityClassLister.List(labels.Everything())
|
|
if err != nil {
|
|
return fmt.Errorf("unable to list priorityclasses: %v", err)
|
|
}
|
|
|
|
for _, item := range priorityClasses {
|
|
if _, err := fakeClient.SchedulingV1().PriorityClasses().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
|
|
return fmt.Errorf("unable to copy priorityclass: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
|
|
var span trace.Span
|
|
ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies")
|
|
defer span.End()
|
|
|
|
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
|
|
|
|
var nodeSelector string
|
|
if deschedulerPolicy.NodeSelector != nil {
|
|
nodeSelector = *deschedulerPolicy.NodeSelector
|
|
}
|
|
|
|
var eventClient clientset.Interface
|
|
if rs.DryRun {
|
|
eventClient = fakeclientset.NewSimpleClientset()
|
|
} else {
|
|
eventClient = rs.Client
|
|
}
|
|
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient)
|
|
defer eventBroadcaster.Shutdown()
|
|
|
|
descheduler, err := newDescheduler(rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory)
|
|
if err != nil {
|
|
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
|
|
return err
|
|
}
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
sharedInformerFactory.Start(ctx.Done())
|
|
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
|
|
|
wait.NonSlidingUntil(func() {
|
|
// A next context is created here intentionally to avoid nesting the spans via context.
|
|
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
|
|
defer sSpan.End()
|
|
nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, descheduler.nodeLister, nodeSelector)
|
|
if err != nil {
|
|
sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
|
|
klog.Error(err)
|
|
cancel()
|
|
return
|
|
}
|
|
err = descheduler.runDeschedulerLoop(sCtx, nodes)
|
|
if err != nil {
|
|
sSpan.AddEvent("Failed to run descheduler loop", trace.WithAttributes(attribute.String("err", err.Error())))
|
|
klog.Error(err)
|
|
cancel()
|
|
return
|
|
}
|
|
// If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration
|
|
if rs.DeschedulingInterval.Seconds() == 0 {
|
|
cancel()
|
|
}
|
|
}, rs.DeschedulingInterval, ctx.Done())
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) {
|
|
for idx, pluginConfig := range pluginConfigs {
|
|
if pluginConfig.Name == pluginName {
|
|
return &pluginConfig, idx
|
|
}
|
|
}
|
|
return nil, 0
|
|
}
|
|
|
|
func createClients(clientConnection componentbaseconfig.ClientConnectionConfiguration) (clientset.Interface, clientset.Interface, error) {
|
|
kClient, err := client.CreateClient(clientConnection, "descheduler")
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
eventClient, err := client.CreateClient(clientConnection, "")
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return kClient, eventClient, nil
|
|
}
|
|
|
|
func trimManagedFields(obj interface{}) (interface{}, error) {
|
|
if accessor, err := meta.Accessor(obj); err == nil {
|
|
accessor.SetManagedFields(nil)
|
|
}
|
|
return obj, nil
|
|
}
|