From cf9edca33ca0a77c99bf3bd220fa3bcd4382d8a8 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Sun, 4 Jan 2026 15:09:24 +0100 Subject: [PATCH] feat(profile): inject a plugin instance ID to each built plugin --- pkg/descheduler/descheduler.go | 5 +- pkg/framework/fake/fake.go | 5 + pkg/framework/fake/plugin/fake.go | 16 ++ pkg/framework/profile/profile.go | 47 +++- pkg/framework/profile/profile_test.go | 323 ++++++++++++++++++++++++++ pkg/framework/types/types.go | 3 + 6 files changed, 392 insertions(+), 7 deletions(-) diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index e6f5940d9..ee8b1540e 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -414,7 +414,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac ctx, span = tracing.Tracer().Start(ctx, "runProfiles") defer span.End() var profileRunners []profileRunner - for _, profile := range d.deschedulerPolicy.Profiles { + for idx, profile := range d.deschedulerPolicy.Profiles { currProfile, err := frameworkprofile.NewProfile( ctx, profile, @@ -425,6 +425,9 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), frameworkprofile.WithMetricsCollector(d.metricsCollector), frameworkprofile.WithPrometheusClient(d.prometheusClient), + // Generate a unique instance ID using just the index to avoid long IDs + // when profile names are very long + frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)), ) if err != nil { klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) diff --git a/pkg/framework/fake/fake.go b/pkg/framework/fake/fake.go index 681a9275b..066ecdeaf 100644 --- a/pkg/framework/fake/fake.go +++ b/pkg/framework/fake/fake.go @@ -23,6 +23,7 @@ type HandleImpl struct { PodEvictorImpl *evictions.PodEvictor MetricsCollectorImpl *metricscollector.MetricsCollector PrometheusClientImpl promapi.Client + PluginInstanceIDImpl string } var _ frameworktypes.Handle = &HandleImpl{} @@ -62,3 +63,7 @@ func (hi *HandleImpl) PreEvictionFilter(pod *v1.Pod) bool { func (hi *HandleImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.EvictOptions) error { return hi.PodEvictorImpl.EvictPod(ctx, pod, opts) } + +func (hi *HandleImpl) PluginInstanceID() string { + return hi.PluginInstanceIDImpl +} diff --git a/pkg/framework/fake/plugin/fake.go b/pkg/framework/fake/plugin/fake.go index 327b1bb36..ccf1b63a9 100644 --- a/pkg/framework/fake/plugin/fake.go +++ b/pkg/framework/fake/plugin/fake.go @@ -73,6 +73,22 @@ func NewPluginFncFromFake(fp *FakePlugin) pluginregistry.PluginBuilder { } } +func NewPluginFncFromFakeWithReactor(fp *FakePlugin, callback func(ActionImpl)) pluginregistry.PluginBuilder { + return func(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) { + fakePluginArgs, ok := args.(*FakePluginArgs) + if !ok { + return nil, fmt.Errorf("want args to be of type FakePluginArgs, got %T", args) + } + + fp.handle = handle + fp.args = fakePluginArgs + + callback(ActionImpl{handle: fp.handle}) + + return fp, nil + } +} + // New builds plugin from its arguments while passing a handle func New(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) { fakePluginArgs, ok := args.(*FakePluginArgs) diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index 8cf1f82b0..3fff9057b 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -78,6 +78,14 @@ type handleImpl struct { var _ frameworktypes.Handle = &handleImpl{} +// pluginHandle wraps a shared handleImpl and adds a plugin-specific instance ID +type pluginHandle struct { + *handleImpl + pluginInstanceID string +} + +var _ frameworktypes.Handle = &pluginHandle{} + // ClientSet retrieves kube client set func (hi *handleImpl) ClientSet() clientset.Interface { return hi.clientSet @@ -106,6 +114,17 @@ func (hi *handleImpl) Evictor() frameworktypes.Evictor { return hi.evictor } +// PluginInstanceID returns an empty string for the base handle. +// Plugins should receive a pluginHandle which has a specific instance ID. +func (hi *handleImpl) PluginInstanceID() string { + panic(fmt.Errorf("Not implemented")) +} + +// PluginInstanceID returns a unique identifier for this plugin instance. +func (ph *pluginHandle) PluginInstanceID() string { + return ph.pluginInstanceID +} + type filterPlugin interface { frameworktypes.Plugin Filter(pod *v1.Pod) bool @@ -142,6 +161,7 @@ type handleImplOpts struct { getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc podEvictor *evictions.PodEvictor metricsCollector *metricscollector.MetricsCollector + profileInstanceID string } // WithClientSet sets clientSet for the scheduling frameworkImpl. @@ -182,6 +202,14 @@ func WithMetricsCollector(metricsCollector *metricscollector.MetricsCollector) O } } +// WithProfileInstanceID sets the profile instance ID for the handle. +// This will be used to construct unique plugin instance IDs. +func WithProfileInstanceID(profileInstanceID string) Option { + return func(o *handleImplOpts) { + o.profileInstanceID = profileInstanceID + } +} + func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api.PluginConfig, int) { for idx, pluginConfig := range pluginConfigs { if pluginConfig.Name == pluginName { @@ -191,7 +219,7 @@ func getPluginConfig(pluginName string, pluginConfigs []api.PluginConfig) (*api. return nil, 0 } -func buildPlugin(ctx context.Context, config api.DeschedulerProfile, pluginName string, handle *handleImpl, reg pluginregistry.Registry) (frameworktypes.Plugin, error) { +func buildPlugin(ctx context.Context, config api.DeschedulerProfile, pluginName string, handle frameworktypes.Handle, reg pluginregistry.Registry) (frameworktypes.Plugin, error) { pc, _ := getPluginConfig(pluginName, config.PluginConfigs) if pc == nil { klog.ErrorS(fmt.Errorf("unable to get plugin config"), "skipping plugin", "plugin", pluginName, "profile", config.Name) @@ -272,6 +300,7 @@ func NewProfile(ctx context.Context, config api.DeschedulerProfile, reg pluginre return nil, fmt.Errorf("profile %q configures preEvictionFilter extension point of non-existing plugins: %v", config.Name, sets.New(config.Plugins.PreEvictionFilter.Enabled...).Difference(pi.preEvictionFilter)) } + // Create a base handle that will be used as a template for plugin-specific handles handle := &handleImpl{ clientSet: hOpts.clientSet, getPodsAssignedToNodeFunc: hOpts.getPodsAssignedToNodeFunc, @@ -284,20 +313,26 @@ func NewProfile(ctx context.Context, config api.DeschedulerProfile, reg pluginre prometheusClient: hOpts.prometheusClient, } + // Collect all unique plugin names across all extension points pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...) pluginNames = append(pluginNames, config.Plugins.Filter.Enabled...) pluginNames = append(pluginNames, config.Plugins.PreEvictionFilter.Enabled...) + // Build each unique plugin only once with a unique plugin instance ID plugins := make(map[string]frameworktypes.Plugin) - for _, plugin := range sets.New(pluginNames...).UnsortedList() { - pg, err := buildPlugin(ctx, config, plugin, handle, reg) + for idx, pluginName := range sets.New(pluginNames...).UnsortedList() { + ph := &pluginHandle{ + handleImpl: handle, + pluginInstanceID: fmt.Sprintf("%s-%d", hOpts.profileInstanceID, idx), + } + pg, err := buildPlugin(ctx, config, pluginName, ph, reg) if err != nil { - return nil, fmt.Errorf("unable to build %v plugin: %v", plugin, err) + return nil, fmt.Errorf("unable to build %v plugin: %v", pluginName, err) } if pg == nil { - return nil, fmt.Errorf("got empty %v plugin build", plugin) + return nil, fmt.Errorf("got empty %v plugin build", pluginName) } - plugins[plugin] = pg + plugins[pluginName] = pg } // Later, when a default list of plugins and their extension points is established, diff --git a/pkg/framework/profile/profile_test.go b/pkg/framework/profile/profile_test.go index 31756abc5..134c93e5e 100644 --- a/pkg/framework/profile/profile_test.go +++ b/pkg/framework/profile/profile_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -542,3 +543,325 @@ func TestProfileExtensionPointOrdering(t *testing.T) { t.Errorf("check for balance invocation order failed. Results are not deep equal. mismatch (-want +got):\n%s", diff) } } + +// verifyInstanceIDsMatch verifies that instance IDs captured at creation, deschedule, and balance match +func verifyInstanceIDsMatch(t *testing.T, profileInstanceID string, pluginNames []string, creationIDs, descheduleIDs, balanceIDs map[string]string) { + for _, pluginName := range pluginNames { + creationID := creationIDs[pluginName] + descheduleID := descheduleIDs[pluginName] + balanceID := balanceIDs[pluginName] + + if creationID == "" { + t.Errorf("Profile %s, plugin %s: plugin creation did not capture instance ID", profileInstanceID, pluginName) + } + if descheduleID == "" { + t.Errorf("Profile %s, plugin %s: deschedule extension point did not capture instance ID", profileInstanceID, pluginName) + } + if balanceID == "" { + t.Errorf("Profile %s, plugin %s: balance extension point did not capture instance ID", profileInstanceID, pluginName) + } + + // Verify all IDs match + if creationID != descheduleID { + t.Errorf("Profile %s, plugin %s: instance ID mismatch - creation: %s, deschedule: %s", profileInstanceID, pluginName, creationID, descheduleID) + } + if creationID != balanceID { + t.Errorf("Profile %s, plugin %s: instance ID mismatch - creation: %s, balance: %s", profileInstanceID, pluginName, creationID, balanceID) + } + if descheduleID != balanceID { + t.Errorf("Profile %s, plugin %s: instance ID mismatch - deschedule: %s, balance: %s", profileInstanceID, pluginName, descheduleID, balanceID) + } + } +} + +// verifyInstanceIDFormat verifies that instance IDs have correct format and sequential indices +func verifyInstanceIDFormat(t *testing.T, profileInstanceID string, pluginNames []string, pluginIDs map[string]string) sets.Set[string] { + if len(pluginIDs) != len(pluginNames) { + t.Errorf("Profile %s: expected %d plugins to be invoked, got %d", profileInstanceID, len(pluginNames), len(pluginIDs)) + } + + // Collect all instance IDs for this profile + profileInstanceIDs := sets.New[string]() + for pluginName, instanceID := range pluginIDs { + if instanceID == "" { + t.Errorf("Profile %s, plugin %s: expected instance ID to be set, got empty string", profileInstanceID, pluginName) + } + profileInstanceIDs.Insert(instanceID) + } + + // Verify all IDs within this profile are unique + if profileInstanceIDs.Len() != len(pluginIDs) { + t.Errorf("Profile %s: duplicate instance IDs found", profileInstanceID) + } + + // Verify all IDs match the expected format: "{profileInstanceID}-{index}" + // and contain sequential indices from 0 to n-1 + expectedIndices := sets.New[int]() + for i := 0; i < len(pluginNames); i++ { + expectedIndices.Insert(i) + } + actualIndices := sets.New[int]() + for pluginName, instanceID := range pluginIDs { + var idx int + expectedPrefix := profileInstanceID + "-" + if !strings.HasPrefix(instanceID, expectedPrefix) { + t.Errorf("Profile %s, plugin %s: instance ID %s does not start with %s", profileInstanceID, pluginName, instanceID, expectedPrefix) + continue + } + _, err := fmt.Sscanf(instanceID, profileInstanceID+"-%d", &idx) + if err != nil { + t.Errorf("Profile %s, plugin %s: instance ID %s does not match expected format", profileInstanceID, pluginName, instanceID) + continue + } + actualIndices.Insert(idx) + } + // Verify we have indices 0 through n-1 + diff := cmp.Diff(expectedIndices, actualIndices) + if diff != "" { + t.Errorf("Profile %s: instance ID indices mismatch (-want +got):\n%s", profileInstanceID, diff) + } + + return profileInstanceIDs +} + +func TestPluginInstanceIDs(t *testing.T) { + tests := []struct { + name string + profiles []struct { + profileInstanceID string + pluginNames []string + } + }{ + { + name: "single plugin gets instance ID", + profiles: []struct { + profileInstanceID string + pluginNames []string + }{ + { + profileInstanceID: "0", + pluginNames: []string{"TestPlugin"}, + }, + }, + }, + { + name: "two plugins get different instance IDs", + profiles: []struct { + profileInstanceID string + pluginNames []string + }{ + { + profileInstanceID: "0", + pluginNames: []string{"Plugin_0", "Plugin_1"}, + }, + }, + }, + { + name: "three profiles with two plugins each get unique instance IDs", + profiles: []struct { + profileInstanceID string + pluginNames []string + }{ + { + profileInstanceID: "0", + pluginNames: []string{"Plugin_A", "Plugin_B"}, + }, + { + profileInstanceID: "1", + pluginNames: []string{"Plugin_C", "Plugin_D"}, + }, + { + profileInstanceID: "2", + pluginNames: []string{"Plugin_E", "Plugin_F"}, + }, + }, + }, + { + name: "three profiles with same plugin names get different instance IDs per profile", + profiles: []struct { + profileInstanceID string + pluginNames []string + }{ + { + profileInstanceID: "0", + pluginNames: []string{"CommonPlugin_X", "CommonPlugin_Y"}, + }, + { + profileInstanceID: "1", + pluginNames: []string{"CommonPlugin_X", "CommonPlugin_Y"}, + }, + { + profileInstanceID: "2", + pluginNames: []string{"CommonPlugin_X", "CommonPlugin_Y"}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + n1 := testutils.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := testutils.BuildTestNode("n2", 2000, 3000, 10, nil) + nodes := []*v1.Node{n1, n2} + + // Track instance IDs by profile from different stages + profileDescheduleIDs := make(map[string]map[string]string) // profileInstanceID -> pluginName -> instanceID (from Deschedule execution) + profileBalanceIDs := make(map[string]map[string]string) // profileInstanceID -> pluginName -> instanceID (from Balance execution) + profileCreationIDs := make(map[string]map[string]string) // profileInstanceID -> pluginName -> instanceID (from plugin creation) + registry := pluginregistry.NewRegistry() + + // Collect all distinct plugin names across all profiles + allPluginNames := sets.New[string]() + for _, profileCfg := range test.profiles { + allPluginNames.Insert(profileCfg.pluginNames...) + } + + // Helper function to validate and store instance ID + captureInstanceID := func(instanceID, pluginName string, targetMap map[string]map[string]string) { + parts := strings.Split(instanceID, "-") + if len(parts) < 2 { + t.Fatalf("Plugin %s: instance ID %s does not have expected format 'profileID-index'", pluginName, instanceID) + } + profileID := parts[0] + if targetMap[profileID] == nil { + targetMap[profileID] = make(map[string]string) + } + targetMap[profileID][pluginName] = instanceID + } + + // Register all plugins before creating profiles + for _, pluginName := range allPluginNames.UnsortedList() { + // Capture plugin name for closure + name := pluginName + + pluginregistry.Register( + pluginName, + func(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) { + fakePlugin := &fakeplugin.FakePlugin{PluginName: name} + + fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) { + if dAction, ok := action.(fakeplugin.DescheduleAction); ok { + captureInstanceID(dAction.Handle().PluginInstanceID(), name, profileDescheduleIDs) + return true, false, nil + } + return false, false, nil + }) + + fakePlugin.AddReactor(string(frameworktypes.BalanceExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) { + if bAction, ok := action.(fakeplugin.BalanceAction); ok { + captureInstanceID(bAction.Handle().PluginInstanceID(), name, profileBalanceIDs) + return true, false, nil + } + return false, false, nil + }) + + // Use NewPluginFncFromFakeWithReactor to wrap and capture instance ID at creation + builder := fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) { + captureInstanceID(action.Handle().PluginInstanceID(), name, profileCreationIDs) + }) + + return builder(ctx, args, handle) + }, + &fakeplugin.FakePlugin{}, + &fakeplugin.FakePluginArgs{}, + fakeplugin.ValidateFakePluginArgs, + fakeplugin.SetDefaults_FakePluginArgs, + registry, + ) + } + + client := fakeclientset.NewSimpleClientset(n1, n2) + handle, podEvictor, err := frameworktesting.InitFrameworkHandle( + ctx, + client, + nil, + defaultevictor.DefaultEvictorArgs{}, + nil, + ) + if err != nil { + t.Fatalf("Unable to initialize a framework handle: %v", err) + } + + // Create all profiles + var profiles []*profileImpl + for _, profileCfg := range test.profiles { + + var pluginConfigs []api.PluginConfig + for _, pluginName := range profileCfg.pluginNames { + pluginConfigs = append(pluginConfigs, api.PluginConfig{ + Name: pluginName, + Args: &fakeplugin.FakePluginArgs{}, + }) + } + + prfl, err := NewProfile( + ctx, + api.DeschedulerProfile{ + Name: "test-profile", + PluginConfigs: pluginConfigs, + Plugins: api.Plugins{ + Deschedule: api.PluginSet{ + Enabled: profileCfg.pluginNames, + }, + Balance: api.PluginSet{ + Enabled: profileCfg.pluginNames, + }, + }, + }, + registry, + WithClientSet(client), + WithSharedInformerFactory(handle.SharedInformerFactoryImpl), + WithPodEvictor(podEvictor), + WithGetPodsAssignedToNodeFnc(handle.GetPodsAssignedToNodeFuncImpl), + WithProfileInstanceID(profileCfg.profileInstanceID), + ) + if err != nil { + t.Fatalf("unable to create profile: %v", err) + } + profiles = append(profiles, prfl) + } + + // Run deschedule and balance plugins for all profiles + for _, prfl := range profiles { + prfl.RunDeschedulePlugins(ctx, nodes) + prfl.RunBalancePlugins(ctx, nodes) + } + + // Verify creation, deschedule, and balance IDs all match + for _, profileCfg := range test.profiles { + verifyInstanceIDsMatch( + t, + profileCfg.profileInstanceID, + profileCfg.pluginNames, + profileCreationIDs[profileCfg.profileInstanceID], + profileDescheduleIDs[profileCfg.profileInstanceID], + profileBalanceIDs[profileCfg.profileInstanceID], + ) + } + + // Verify all plugins were invoked and have correct instance IDs + allInstanceIDs := sets.New[string]() + for _, profileCfg := range test.profiles { + profileInstanceIDs := verifyInstanceIDFormat( + t, + profileCfg.profileInstanceID, + profileCfg.pluginNames, + profileDescheduleIDs[profileCfg.profileInstanceID], + ) + allInstanceIDs = allInstanceIDs.Union(profileInstanceIDs) + } + + // Verify all instance IDs are unique across all profiles + totalExpectedPlugins := 0 + for _, profileCfg := range test.profiles { + totalExpectedPlugins += len(profileCfg.pluginNames) + } + if allInstanceIDs.Len() != totalExpectedPlugins { + t.Errorf("Expected %d unique instance IDs across all profiles, got %d", totalExpectedPlugins, allInstanceIDs.Len()) + } + }) + } +} diff --git a/pkg/framework/types/types.go b/pkg/framework/types/types.go index 2480e06b0..0041c47ad 100644 --- a/pkg/framework/types/types.go +++ b/pkg/framework/types/types.go @@ -41,6 +41,9 @@ type Handle interface { GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc SharedInformerFactory() informers.SharedInformerFactory MetricsCollector() *metricscollector.MetricsCollector + // PluginInstanceID returns a unique identifier for this plugin instance. + // The ID is unique across all plugin instances in a configuration. + PluginInstanceID() string } // Evictor defines an interface for filtering and evicting pods