1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00
Files
descheduler/pkg/descheduler/descheduler.go
2024-02-01 09:58:44 -05:00

474 lines
17 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package descheduler
import (
"context"
"errors"
"fmt"
"math"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/events"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
listersv1 "k8s.io/client-go/listers/core/v1"
schedulingv1 "k8s.io/client-go/listers/scheduling/v1"
core "k8s.io/client-go/testing"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/metrics"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
)
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
type profileRunner struct {
name string
descheduleEPs, balanceEPs eprunner
}
type descheduler struct {
rs *options.DeschedulerServer
podLister listersv1.PodLister
nodeLister listersv1.NodeLister
namespaceLister listersv1.NamespaceLister
priorityClassLister schedulingv1.PriorityClassLister
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
evictionPolicyGroupVersion string
deschedulerPolicy *api.DeschedulerPolicy
eventRecorder events.EventRecorder
}
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister()
priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister()
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
return nil, fmt.Errorf("build get pods assigned to node function error: %v", err)
}
return &descheduler{
rs: rs,
podLister: podLister,
nodeLister: nodeLister,
namespaceLister: namespaceLister,
priorityClassLister: priorityClassLister,
getPodsAssignedToNode: getPodsAssignedToNode,
sharedInformerFactory: sharedInformerFactory,
evictionPolicyGroupVersion: evictionPolicyGroupVersion,
deschedulerPolicy: deschedulerPolicy,
eventRecorder: eventRecorder,
}, nil
}
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "runDeschedulerLoop")
defer span.End()
defer func(loopStartDuration time.Time) {
metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
}(time.Now())
// if len is still <= 1 error out
if len(nodes) <= 1 {
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
return fmt.Errorf("the cluster size is 0 or 1")
}
var client clientset.Interface
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
// So when evicting pods while running multiple strategies in a row have the cummulative effect
// as is when evicting pods for real.
if d.rs.DryRun {
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// Create a new cache so we start from scratch without any leftovers
fakeClient, err := cachedClient(d.rs.Client, d.podLister, d.nodeLister, d.namespaceLister, d.priorityClassLister)
if err != nil {
return err
}
// create a new instance of the shared informer factor from the cached client
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
// register the pod informer, otherwise it will not get running
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
if err != nil {
return fmt.Errorf("build get pods assigned to node function error: %v", err)
}
fakeCtx, cncl := context.WithCancel(context.TODO())
defer cncl()
fakeSharedInformerFactory.Start(fakeCtx.Done())
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
client = fakeClient
d.sharedInformerFactory = fakeSharedInformerFactory
} else {
client = d.rs.Client
}
klog.V(3).Infof("Building a pod evictor")
podEvictor := evictions.NewPodEvictor(
client,
d.evictionPolicyGroupVersion,
d.rs.DryRun,
d.deschedulerPolicy.MaxNoOfPodsToEvictPerNode,
d.deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace,
nodes,
!d.rs.DisableMetrics,
d.eventRecorder,
)
d.runProfiles(ctx, client, nodes, podEvictor)
klog.V(1).InfoS("Number of evicted pods", "totalEvicted", podEvictor.TotalEvicted())
return nil
}
// runProfiles runs all the deschedule plugins of all profiles and
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
// see https://github.com/kubernetes-sigs/descheduler/issues/979
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node, podEvictor *evictions.PodEvictor) {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "runProfiles")
defer span.End()
var profileRunners []profileRunner
for _, profile := range d.deschedulerPolicy.Profiles {
currProfile, err := frameworkprofile.NewProfile(
profile,
pluginregistry.PluginRegistry,
frameworkprofile.WithClientSet(client),
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
continue
}
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
}
for _, profileR := range profileRunners {
// First deschedule
status := profileR.descheduleEPs(ctx, nodes)
if status != nil && status.Err != nil {
span.AddEvent("failed to perform deschedule operations", trace.WithAttributes(attribute.String("err", status.Err.Error()), attribute.String("profile", profileR.name), attribute.String("operation", tracing.DescheduleOperation)))
klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profileR.name)
continue
}
}
for _, profileR := range profileRunners {
// Balance Later
status := profileR.balanceEPs(ctx, nodes)
if status != nil && status.Err != nil {
span.AddEvent("failed to perform balance operations", trace.WithAttributes(attribute.String("err", status.Err.Error()), attribute.String("profile", profileR.name), attribute.String("operation", tracing.BalanceOperation)))
klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profileR.name)
continue
}
}
}
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "Run")
defer span.End()
metrics.Register()
clientConnection := rs.ClientConnection
if rs.KubeconfigFile != "" && clientConnection.Kubeconfig == "" {
clientConnection.Kubeconfig = rs.KubeconfigFile
}
rsclient, eventClient, err := createClients(clientConnection)
if err != nil {
return err
}
rs.Client = rsclient
rs.EventClient = eventClient
deschedulerPolicy, err := LoadPolicyConfig(rs.PolicyConfigFile, rs.Client, pluginregistry.PluginRegistry)
if err != nil {
return err
}
if deschedulerPolicy == nil {
return fmt.Errorf("deschedulerPolicy is nil")
}
// Add k8s compatibility warnings to logs
if err := validateVersionCompatibility(rs.Client.Discovery(), version.Get()); err != nil {
klog.Warning(err.Error())
}
evictionPolicyGroupVersion, err := eutils.SupportEviction(rs.Client)
if err != nil || len(evictionPolicyGroupVersion) == 0 {
return err
}
runFn := func() error {
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion)
}
if rs.LeaderElection.LeaderElect && rs.DeschedulingInterval.Seconds() == 0 {
span.AddEvent("Validation Failure", trace.WithAttributes(attribute.String("err", "leaderElection must be used with deschedulingInterval")))
return fmt.Errorf("leaderElection must be used with deschedulingInterval")
}
if rs.LeaderElection.LeaderElect && rs.DryRun {
klog.V(1).Info("Warning: DryRun is set to True. You need to disable it to use Leader Election.")
}
if rs.LeaderElection.LeaderElect && !rs.DryRun {
if err := NewLeaderElection(runFn, rsclient, &rs.LeaderElection, ctx); err != nil {
span.AddEvent("Leader Election Failure", trace.WithAttributes(attribute.String("err", err.Error())))
return fmt.Errorf("leaderElection: %w", err)
}
return nil
}
return runFn()
}
func validateVersionCompatibility(discovery discovery.DiscoveryInterface, versionInfo version.Info) error {
serverVersionInfo, err := discovery.ServerVersion()
if err != nil {
return errors.New("failed to discover Kubernetes server version")
}
serverVersion, err := utilversion.ParseSemantic(serverVersionInfo.String())
if err != nil {
return errors.New("failed to parse Kubernetes server version")
}
deschedulerVersion, err := utilversion.ParseGeneric(versionInfo.GitVersion)
if err != nil {
return errors.New("failed to convert Descheduler minor version to float")
}
deschedulerMinor := float64(deschedulerVersion.Minor())
serverMinor := float64(serverVersion.Minor())
if math.Abs(deschedulerMinor-serverMinor) > 3 {
return fmt.Errorf(
"descheduler version %v may not be supported on your version of Kubernetes %v."+
"See compatibility docs for more info: https://github.com/kubernetes-sigs/descheduler#compatibility-matrix",
deschedulerVersion.String(),
serverVersionInfo.String(),
)
}
return nil
}
func cachedClient(
realClient clientset.Interface,
podLister listersv1.PodLister,
nodeLister listersv1.NodeLister,
namespaceLister listersv1.NamespaceLister,
priorityClassLister schedulingv1.PriorityClassLister,
) (clientset.Interface, error) {
fakeClient := fakeclientset.NewSimpleClientset()
// simulate a pod eviction by deleting a pod
fakeClient.PrependReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "eviction" {
createAct, matched := action.(core.CreateActionImpl)
if !matched {
return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl")
}
eviction, matched := createAct.Object.(*policy.Eviction)
if !matched {
return false, nil, fmt.Errorf("unable to convert action object into *policy.Eviction")
}
if err := fakeClient.Tracker().Delete(action.GetResource(), eviction.GetNamespace(), eviction.GetName()); err != nil {
return false, nil, fmt.Errorf("unable to delete pod %v/%v: %v", eviction.GetNamespace(), eviction.GetName(), err)
}
return true, nil, nil
}
// fallback to the default reactor
return false, nil, nil
})
klog.V(3).Infof("Pulling resources for the cached client from the cluster")
pods, err := podLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("unable to list pods: %v", err)
}
for _, item := range pods {
if _, err := fakeClient.CoreV1().Pods(item.Namespace).Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("unable to copy pod: %v", err)
}
}
nodes, err := nodeLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("unable to list nodes: %v", err)
}
for _, item := range nodes {
if _, err := fakeClient.CoreV1().Nodes().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("unable to copy node: %v", err)
}
}
namespaces, err := namespaceLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("unable to list namespaces: %v", err)
}
for _, item := range namespaces {
if _, err := fakeClient.CoreV1().Namespaces().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("unable to copy namespace: %v", err)
}
}
priorityClasses, err := priorityClassLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("unable to list priorityclasses: %v", err)
}
for _, item := range priorityClasses {
if _, err := fakeClient.SchedulingV1().PriorityClasses().Create(context.TODO(), item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("unable to copy priorityclass: %v", err)
}
}
return fakeClient, nil
}
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies")
defer span.End()
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
var nodeSelector string
if deschedulerPolicy.NodeSelector != nil {
nodeSelector = *deschedulerPolicy.NodeSelector
}
var eventClient clientset.Interface
if rs.DryRun {
eventClient = fakeclientset.NewSimpleClientset()
} else {
eventClient = rs.Client
}
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient)
defer eventBroadcaster.Shutdown()
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory)
if err != nil {
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
wait.NonSlidingUntil(func() {
// A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
defer sSpan.End()
nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, nodeLister, nodeSelector)
if err != nil {
sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
cancel()
return
}
err = descheduler.runDeschedulerLoop(sCtx, nodes)
if err != nil {
sSpan.AddEvent("Failed to run descheduler loop", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
cancel()
return
}
// If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration
if rs.DeschedulingInterval.Seconds() == 0 {
cancel()
}
}, rs.DeschedulingInterval, ctx.Done())
return nil
}
func GetPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) {
for idx, pluginConfig := range pluginConfigs {
if pluginConfig.Name == pluginName {
return &pluginConfig, idx
}
}
return nil, 0
}
func createClients(clientConnection componentbaseconfig.ClientConnectionConfiguration) (clientset.Interface, clientset.Interface, error) {
kClient, err := client.CreateClient(clientConnection, "descheduler")
if err != nil {
return nil, nil, err
}
eventClient, err := client.CreateClient(clientConnection, "")
if err != nil {
return nil, nil, err
}
return kClient, eventClient, nil
}
func trimManagedFields(obj interface{}) (interface{}, error) {
if accessor, err := meta.Accessor(obj); err == nil {
accessor.SetManagedFields(nil)
}
return obj, nil
}