mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-25 20:59:28 +01:00
refactor(pkg/operator): replace informerResource with a kubeClientSandbox
This commit is contained in:
@@ -86,7 +86,7 @@ type profileRunner struct {
|
||||
|
||||
type descheduler struct {
|
||||
rs *options.DeschedulerServer
|
||||
ir *informerResources
|
||||
kubeClientSandbox *kubeClientSandbox
|
||||
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
namespacedSecretsLister corev1listers.SecretNamespaceLister
|
||||
@@ -102,34 +102,40 @@ type descheduler struct {
|
||||
metricsProviders map[api.MetricsSource]*api.MetricsProvider
|
||||
}
|
||||
|
||||
type informerResources struct {
|
||||
// kubeClientSandbox creates a sandbox environment with a fake client and informer factory
|
||||
// that mirrors resources from a real client, useful for dry-run testing scenarios
|
||||
type kubeClientSandbox struct {
|
||||
client clientset.Interface
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
fakeKubeClient *fakeclientset.Clientset
|
||||
fakeFactory informers.SharedInformerFactory
|
||||
resourceToInformer map[schema.GroupVersionResource]informers.GenericInformer
|
||||
}
|
||||
|
||||
func newInformerResources(sharedInformerFactory informers.SharedInformerFactory) *informerResources {
|
||||
return &informerResources{
|
||||
func newKubeClientSandbox(client clientset.Interface, sharedInformerFactory informers.SharedInformerFactory, resources ...schema.GroupVersionResource) (*kubeClientSandbox, error) {
|
||||
sandbox := &kubeClientSandbox{
|
||||
client: client,
|
||||
sharedInformerFactory: sharedInformerFactory,
|
||||
resourceToInformer: make(map[schema.GroupVersionResource]informers.GenericInformer),
|
||||
}
|
||||
}
|
||||
|
||||
func (ir *informerResources) Uses(resources ...schema.GroupVersionResource) error {
|
||||
for _, resource := range resources {
|
||||
informer, err := ir.sharedInformerFactory.ForResource(resource)
|
||||
informer, err := sharedInformerFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ir.resourceToInformer[resource] = informer
|
||||
sandbox.resourceToInformer[resource] = informer
|
||||
}
|
||||
return nil
|
||||
|
||||
return sandbox, nil
|
||||
}
|
||||
|
||||
// CopyTo Copy informer subscriptions to the new factory and objects to the fake client so that the backing caches are populated for when listers are used.
|
||||
func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFactory informers.SharedInformerFactory) error {
|
||||
for resource, informer := range ir.resourceToInformer {
|
||||
_, err := newFactory.ForResource(resource)
|
||||
func (sandbox *kubeClientSandbox) buildSandbox() error {
|
||||
sandbox.fakeKubeClient = fakeclientset.NewSimpleClientset()
|
||||
sandbox.fakeFactory = informers.NewSharedInformerFactory(sandbox.fakeKubeClient, 0)
|
||||
|
||||
for resource, informer := range sandbox.resourceToInformer {
|
||||
_, err := sandbox.fakeFactory.ForResource(resource)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting resource %s: %w", resource, err)
|
||||
}
|
||||
@@ -140,12 +146,23 @@ func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFact
|
||||
}
|
||||
|
||||
for _, object := range objects {
|
||||
fakeClient.Tracker().Add(object)
|
||||
if err := sandbox.fakeKubeClient.Tracker().Add(object); err != nil {
|
||||
return fmt.Errorf("error adding object to tracker: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeClient() *fakeclientset.Clientset {
|
||||
return sandbox.fakeKubeClient
|
||||
}
|
||||
|
||||
func (sandbox *kubeClientSandbox) fakeSharedInformerFactory() informers.SharedInformerFactory {
|
||||
return sandbox.fakeFactory
|
||||
}
|
||||
|
||||
func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.MetricsSource]*api.MetricsProvider {
|
||||
providersMap := make(map[api.MetricsSource]*api.MetricsProvider)
|
||||
for _, provider := range providersList {
|
||||
@@ -157,16 +174,19 @@ func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.Metri
|
||||
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
|
||||
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
|
||||
|
||||
ir := newInformerResources(sharedInformerFactory)
|
||||
ir.Uses(v1.SchemeGroupVersion.WithResource("pods"),
|
||||
// Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay
|
||||
// consistent with the real runs without having to keep the list here in sync.
|
||||
kubeClientSandbox, err := newKubeClientSandbox(rs.Client, sharedInformerFactory,
|
||||
v1.SchemeGroupVersion.WithResource("pods"),
|
||||
v1.SchemeGroupVersion.WithResource("nodes"),
|
||||
// Future work could be to let each plugin declare what type of resources it needs; that way dry runs would stay
|
||||
// consistent with the real runs without having to keep the list here in sync.
|
||||
v1.SchemeGroupVersion.WithResource("namespaces"), // Used by the defaultevictor plugin
|
||||
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"), // Used by the defaultevictor plugin
|
||||
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), // Used by the defaultevictor plugin
|
||||
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"), // Used by the defaultevictor plugin
|
||||
) // Used by the defaultevictor plugin
|
||||
v1.SchemeGroupVersion.WithResource("namespaces"),
|
||||
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"),
|
||||
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
|
||||
v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create kube client sandbox: %v", err)
|
||||
}
|
||||
|
||||
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
|
||||
if err != nil {
|
||||
@@ -195,7 +215,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
|
||||
|
||||
desch := &descheduler{
|
||||
rs: rs,
|
||||
ir: ir,
|
||||
kubeClientSandbox: kubeClientSandbox,
|
||||
getPodsAssignedToNode: getPodsAssignedToNode,
|
||||
sharedInformerFactory: sharedInformerFactory,
|
||||
deschedulerPolicy: deschedulerPolicy,
|
||||
@@ -367,16 +387,17 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context) error {
|
||||
if d.rs.DryRun {
|
||||
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
|
||||
// Create a new cache so we start from scratch without any leftovers
|
||||
fakeClient := fakeclientset.NewSimpleClientset()
|
||||
// simulate a pod eviction by deleting a pod
|
||||
fakeClient.PrependReactor("create", "pods", d.podEvictionReactionFnc(fakeClient))
|
||||
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
|
||||
err := d.ir.CopyTo(fakeClient, fakeSharedInformerFactory)
|
||||
err := d.kubeClientSandbox.buildSandbox()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fakeClient := d.kubeClientSandbox.fakeClient()
|
||||
fakeSharedInformerFactory := d.kubeClientSandbox.fakeSharedInformerFactory()
|
||||
|
||||
// simulate a pod eviction by deleting a pod
|
||||
fakeClient.PrependReactor("create", "pods", d.podEvictionReactionFnc(fakeClient))
|
||||
|
||||
// create a new instance of the shared informer factor from the cached client
|
||||
// register the pod informer, otherwise it will not get running
|
||||
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
|
||||
|
||||
Reference in New Issue
Block a user