diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 91c2e0cdd..2720a4bcd 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -26,35 +26,185 @@ import ( "time" "k8s.io/client-go/discovery" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/events" componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" + v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" listersv1 "k8s.io/client-go/listers/core/v1" schedulingv1 "k8s.io/client-go/listers/scheduling/v1" core "k8s.io/client-go/testing" + "sigs.k8s.io/descheduler/pkg/descheduler/client" + eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" + nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" + "sigs.k8s.io/descheduler/pkg/utils" + "sigs.k8s.io/descheduler/pkg/version" + "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/metrics" "sigs.k8s.io/descheduler/pkg/api" - "sigs.k8s.io/descheduler/pkg/descheduler/client" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" - eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" - nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile" - "sigs.k8s.io/descheduler/pkg/utils" - "sigs.k8s.io/descheduler/pkg/version" + frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" ) +type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status + +type profileRunner struct { + name string + descheduleEPs, balanceEPs eprunner +} + +type descheduler struct { + rs *options.DeschedulerServer + podLister listersv1.PodLister + nodeLister listersv1.NodeLister + namespaceLister listersv1.NamespaceLister + priorityClassLister schedulingv1.PriorityClassLister + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + sharedInformerFactory informers.SharedInformerFactory + evictionPolicyGroupVersion string + deschedulerPolicy *api.DeschedulerPolicy + eventRecorder events.EventRecorder +} + +func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + podLister := sharedInformerFactory.Core().V1().Pods().Lister() + nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() + namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister() + priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister() + + getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + return nil, fmt.Errorf("build get pods assigned to node function error: %v", err) + } + + return &descheduler{ + rs: rs, + podLister: podLister, + nodeLister: nodeLister, + namespaceLister: namespaceLister, + priorityClassLister: priorityClassLister, + getPodsAssignedToNode: getPodsAssignedToNode, + sharedInformerFactory: sharedInformerFactory, + evictionPolicyGroupVersion: evictionPolicyGroupVersion, + deschedulerPolicy: deschedulerPolicy, + eventRecorder: eventRecorder, + }, nil +} + +func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error { + loopStartDuration := time.Now() + defer metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds()) + + // if len is still <= 1 error out + if len(nodes) <= 1 { + klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..") + return fmt.Errorf("the cluster size is 0 or 1") + } + + var client clientset.Interface + // When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client. + // So when evicting pods while running multiple strategies in a row have the cummulative effect + // as is when evicting pods for real. + if d.rs.DryRun { + klog.V(3).Infof("Building a cached client from the cluster for the dry run") + // Create a new cache so we start from scratch without any leftovers + fakeClient, err := cachedClient(d.rs.Client, d.podLister, d.nodeLister, d.namespaceLister, d.priorityClassLister) + if err != nil { + return err + } + + // create a new instance of the shared informer factor from the cached client + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + // register the pod informer, otherwise it will not get running + d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer()) + if err != nil { + return fmt.Errorf("build get pods assigned to node function error: %v", err) + } + + fakeCtx, cncl := context.WithCancel(context.TODO()) + defer cncl() + fakeSharedInformerFactory.Start(fakeCtx.Done()) + fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done()) + + client = fakeClient + d.sharedInformerFactory = fakeSharedInformerFactory + } else { + client = d.rs.Client + } + + klog.V(3).Infof("Building a pod evictor") + podEvictor := evictions.NewPodEvictor( + client, + d.evictionPolicyGroupVersion, + d.rs.DryRun, + d.deschedulerPolicy.MaxNoOfPodsToEvictPerNode, + d.deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace, + nodes, + !d.rs.DisableMetrics, + d.eventRecorder, + ) + + d.runProfiles(ctx, client, nodes, podEvictor) + + klog.V(1).InfoS("Number of evicted pods", "totalEvicted", podEvictor.TotalEvicted()) + + return nil +} + +// runProfiles runs all the deschedule plugins of all profiles and +// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins) +// see https://github.com/kubernetes-sigs/descheduler/issues/979 +func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node, podEvictor *evictions.PodEvictor) { + var profileRunners []profileRunner + for _, profile := range d.deschedulerPolicy.Profiles { + currProfile, err := frameworkprofile.NewProfile( + profile, + pluginregistry.PluginRegistry, + frameworkprofile.WithClientSet(client), + frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), + frameworkprofile.WithPodEvictor(podEvictor), + frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), + ) + if err != nil { + klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) + continue + } + profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins}) + } + + for _, profileR := range profileRunners { + // First deschedule + status := profileR.descheduleEPs(ctx, nodes) + if status != nil && status.Err != nil { + klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profileR.name) + continue + } + } + + for _, profileR := range profileRunners { + // Balance Later + status := profileR.balanceEPs(ctx, nodes) + if status != nil && status.Err != nil { + klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profileR.name) + continue + } + } +} + func Run(ctx context.Context, rs *options.DeschedulerServer) error { metrics.Register() @@ -217,22 +367,7 @@ func cachedClient( func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error { sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0) - podInformer := sharedInformerFactory.Core().V1().Pods().Informer() - podLister := sharedInformerFactory.Core().V1().Pods().Lister() nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() - namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister() - priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister() - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) - if err != nil { - return fmt.Errorf("build get pods assigned to node function error: %v", err) - } - - sharedInformerFactory.Start(ctx.Done()) - sharedInformerFactory.WaitForCacheSync(ctx.Done()) var nodeSelector string if deschedulerPolicy.NodeSelector != nil { @@ -245,103 +380,32 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer } else { eventClient = rs.Client } - eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient) defer eventBroadcaster.Shutdown() - cycleSharedInformerFactory := sharedInformerFactory + descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) wait.NonSlidingUntil(func() { - loopStartDuration := time.Now() - defer metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds()) nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeLister, nodeSelector) if err != nil { - klog.V(1).InfoS("Unable to get ready nodes", "err", err) + klog.Error(err) cancel() return } - - if len(nodes) <= 1 { - klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..") + err = descheduler.runDeschedulerLoop(ctx, nodes) + if err != nil { + klog.Error(err) cancel() return } - - var client clientset.Interface - // When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client. - // So when evicting pods while running multiple strategies in a row have the cummulative effect - // as is when evicting pods for real. - if rs.DryRun { - klog.V(3).Infof("Building a cached client from the cluster for the dry run") - // Create a new cache so we start from scratch without any leftovers - fakeClient, err := cachedClient(rs.Client, podLister, nodeLister, namespaceLister, priorityClassLister) - if err != nil { - klog.Error(err) - return - } - - // create a new instance of the shared informer factor from the cached client - fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) - // register the pod informer, otherwise it will not get running - getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer()) - if err != nil { - klog.Errorf("build get pods assigned to node function error: %v", err) - return - } - - fakeCtx, cncl := context.WithCancel(context.TODO()) - defer cncl() - fakeSharedInformerFactory.Start(fakeCtx.Done()) - fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done()) - - client = fakeClient - cycleSharedInformerFactory = fakeSharedInformerFactory - } else { - client = rs.Client - } - - klog.V(3).Infof("Building a pod evictor") - podEvictor := evictions.NewPodEvictor( - client, - evictionPolicyGroupVersion, - rs.DryRun, - deschedulerPolicy.MaxNoOfPodsToEvictPerNode, - deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace, - nodes, - !rs.DisableMetrics, - eventRecorder, - ) - - for _, profile := range deschedulerPolicy.Profiles { - currProfile, err := frameworkprofile.NewProfile( - profile, - pluginregistry.PluginRegistry, - frameworkprofile.WithClientSet(client), - frameworkprofile.WithSharedInformerFactory(cycleSharedInformerFactory), - frameworkprofile.WithPodEvictor(podEvictor), - frameworkprofile.WithGetPodsAssignedToNodeFnc(getPodsAssignedToNode), - ) - if err != nil { - klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) - continue - } - - // First deschedule - status := currProfile.RunDeschedulePlugins(ctx, nodes) - if status != nil && status.Err != nil { - klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profile.Name) - continue - } - // Then balance - status = currProfile.RunBalancePlugins(ctx, nodes) - if status != nil && status.Err != nil { - klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profile.Name) - continue - } - } - - klog.V(1).InfoS("Number of evicted pods", "totalEvicted", podEvictor.TotalEvicted()) - // If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration if rs.DeschedulingInterval.Seconds() == 0 { cancel()