1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

deschedule/balance order (continuation) (#1177)

* generalise RunDeschedulerLoop and RunProfiles and stabilish deschedule/balance order

* assign nodes outside RunDeschedulerLoop and use instanced profiles

* stop exporting internal profile bits

* refactoring RunProfiles and add methods to Deschduler

* types outside function

* shutdown eventBroadcaster outside NewDescheduler

* all new methods inside descheduler.go

* avoid exporting all Descheduler fields

* Address review comments

---------

Co-authored-by: Lucas Severo Alves <lucassalves65@gmail.com>
This commit is contained in:
Jan Chaloupka
2023-06-21 15:37:40 +02:00
committed by GitHub
parent a497541f39
commit 931aac9c71

View File

@@ -26,35 +26,185 @@ import (
"time"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/events"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
listersv1 "k8s.io/client-go/listers/core/v1"
schedulingv1 "k8s.io/client-go/listers/scheduling/v1"
core "k8s.io/client-go/testing"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/metrics"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
)
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
type profileRunner struct {
name string
descheduleEPs, balanceEPs eprunner
}
type descheduler struct {
rs *options.DeschedulerServer
podLister listersv1.PodLister
nodeLister listersv1.NodeLister
namespaceLister listersv1.NamespaceLister
priorityClassLister schedulingv1.PriorityClassLister
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
evictionPolicyGroupVersion string
deschedulerPolicy *api.DeschedulerPolicy
eventRecorder events.EventRecorder
}
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister()
priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister()
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
return nil, fmt.Errorf("build get pods assigned to node function error: %v", err)
}
return &descheduler{
rs: rs,
podLister: podLister,
nodeLister: nodeLister,
namespaceLister: namespaceLister,
priorityClassLister: priorityClassLister,
getPodsAssignedToNode: getPodsAssignedToNode,
sharedInformerFactory: sharedInformerFactory,
evictionPolicyGroupVersion: evictionPolicyGroupVersion,
deschedulerPolicy: deschedulerPolicy,
eventRecorder: eventRecorder,
}, nil
}
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
loopStartDuration := time.Now()
defer metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
// if len is still <= 1 error out
if len(nodes) <= 1 {
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
return fmt.Errorf("the cluster size is 0 or 1")
}
var client clientset.Interface
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
// So when evicting pods while running multiple strategies in a row have the cummulative effect
// as is when evicting pods for real.
if d.rs.DryRun {
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// Create a new cache so we start from scratch without any leftovers
fakeClient, err := cachedClient(d.rs.Client, d.podLister, d.nodeLister, d.namespaceLister, d.priorityClassLister)
if err != nil {
return err
}
// create a new instance of the shared informer factor from the cached client
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
// register the pod informer, otherwise it will not get running
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
if err != nil {
return fmt.Errorf("build get pods assigned to node function error: %v", err)
}
fakeCtx, cncl := context.WithCancel(context.TODO())
defer cncl()
fakeSharedInformerFactory.Start(fakeCtx.Done())
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
client = fakeClient
d.sharedInformerFactory = fakeSharedInformerFactory
} else {
client = d.rs.Client
}
klog.V(3).Infof("Building a pod evictor")
podEvictor := evictions.NewPodEvictor(
client,
d.evictionPolicyGroupVersion,
d.rs.DryRun,
d.deschedulerPolicy.MaxNoOfPodsToEvictPerNode,
d.deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace,
nodes,
!d.rs.DisableMetrics,
d.eventRecorder,
)
d.runProfiles(ctx, client, nodes, podEvictor)
klog.V(1).InfoS("Number of evicted pods", "totalEvicted", podEvictor.TotalEvicted())
return nil
}
// runProfiles runs all the deschedule plugins of all profiles and
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
// see https://github.com/kubernetes-sigs/descheduler/issues/979
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node, podEvictor *evictions.PodEvictor) {
var profileRunners []profileRunner
for _, profile := range d.deschedulerPolicy.Profiles {
currProfile, err := frameworkprofile.NewProfile(
profile,
pluginregistry.PluginRegistry,
frameworkprofile.WithClientSet(client),
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
continue
}
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
}
for _, profileR := range profileRunners {
// First deschedule
status := profileR.descheduleEPs(ctx, nodes)
if status != nil && status.Err != nil {
klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profileR.name)
continue
}
}
for _, profileR := range profileRunners {
// Balance Later
status := profileR.balanceEPs(ctx, nodes)
if status != nil && status.Err != nil {
klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profileR.name)
continue
}
}
}
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
metrics.Register()
@@ -217,22 +367,7 @@ func cachedClient(
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister()
priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
return fmt.Errorf("build get pods assigned to node function error: %v", err)
}
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
var nodeSelector string
if deschedulerPolicy.NodeSelector != nil {
@@ -245,103 +380,32 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
} else {
eventClient = rs.Client
}
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient)
defer eventBroadcaster.Shutdown()
cycleSharedInformerFactory := sharedInformerFactory
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
wait.NonSlidingUntil(func() {
loopStartDuration := time.Now()
defer metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeLister, nodeSelector)
if err != nil {
klog.V(1).InfoS("Unable to get ready nodes", "err", err)
klog.Error(err)
cancel()
return
}
if len(nodes) <= 1 {
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
err = descheduler.runDeschedulerLoop(ctx, nodes)
if err != nil {
klog.Error(err)
cancel()
return
}
var client clientset.Interface
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
// So when evicting pods while running multiple strategies in a row have the cummulative effect
// as is when evicting pods for real.
if rs.DryRun {
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// Create a new cache so we start from scratch without any leftovers
fakeClient, err := cachedClient(rs.Client, podLister, nodeLister, namespaceLister, priorityClassLister)
if err != nil {
klog.Error(err)
return
}
// create a new instance of the shared informer factor from the cached client
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
// register the pod informer, otherwise it will not get running
getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
if err != nil {
klog.Errorf("build get pods assigned to node function error: %v", err)
return
}
fakeCtx, cncl := context.WithCancel(context.TODO())
defer cncl()
fakeSharedInformerFactory.Start(fakeCtx.Done())
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
client = fakeClient
cycleSharedInformerFactory = fakeSharedInformerFactory
} else {
client = rs.Client
}
klog.V(3).Infof("Building a pod evictor")
podEvictor := evictions.NewPodEvictor(
client,
evictionPolicyGroupVersion,
rs.DryRun,
deschedulerPolicy.MaxNoOfPodsToEvictPerNode,
deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace,
nodes,
!rs.DisableMetrics,
eventRecorder,
)
for _, profile := range deschedulerPolicy.Profiles {
currProfile, err := frameworkprofile.NewProfile(
profile,
pluginregistry.PluginRegistry,
frameworkprofile.WithClientSet(client),
frameworkprofile.WithSharedInformerFactory(cycleSharedInformerFactory),
frameworkprofile.WithPodEvictor(podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(getPodsAssignedToNode),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
continue
}
// First deschedule
status := currProfile.RunDeschedulePlugins(ctx, nodes)
if status != nil && status.Err != nil {
klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profile.Name)
continue
}
// Then balance
status = currProfile.RunBalancePlugins(ctx, nodes)
if status != nil && status.Err != nil {
klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profile.Name)
continue
}
}
klog.V(1).InfoS("Number of evicted pods", "totalEvicted", podEvictor.TotalEvicted())
// If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration
if rs.DeschedulingInterval.Seconds() == 0 {
cancel()