From 3a1a3ff9d897f062cb1b8e53b3a08d4ef1a74463 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Fri, 30 Aug 2024 09:07:27 +0200 Subject: [PATCH] Introduce RequestEviction feature for evicting pods in background When the feature is enabled each pod with descheduler.alpha.kubernetes.io/request-evict-only annotation will have the eviction API error examined for a specific error code/reason and message. If matched eviction of such a pod will be interpreted as initiation of an eviction in background. --- pkg/descheduler/descheduler.go | 18 +- pkg/descheduler/descheduler_test.go | 258 ++++++++-- pkg/descheduler/evictions/evictions.go | 413 ++++++++++++++- pkg/descheduler/evictions/evictions_test.go | 155 +++++- pkg/framework/profile/profile.go | 10 +- pkg/framework/testing/utils.go | 11 +- test/e2e/e2e_evictioninbackground_test.go | 542 ++++++++++++++++++++ test/e2e/e2e_test.go | 11 + test/e2e/e2e_toomanyrestarts_test.go | 1 + test/e2e/test_clientconnection_test.go | 1 + test/run-e2e-tests.sh | 62 ++- 11 files changed, 1405 insertions(+), 77 deletions(-) create mode 100644 test/e2e/e2e_evictioninbackground_test.go diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index c039bdac1..a38803706 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -125,7 +125,8 @@ func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFact return nil } -func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { +func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory, +) (*descheduler, error) { podInformer := sharedInformerFactory.Core().V1().Pods().Informer() ir := newInformerResources(sharedInformerFactory) @@ -144,9 +145,12 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche return nil, fmt.Errorf("build get pods assigned to node function error: %v", err) } - podEvictor := evictions.NewPodEvictor( - nil, + podEvictor, err := evictions.NewPodEvictor( + ctx, + rs.Client, eventRecorder, + podInformer, + rs.DefaultFeatureGates, evictions.NewOptions(). WithPolicyGroupVersion(evictionPolicyGroupVersion). WithMaxPodsToEvictPerNode(deschedulerPolicy.MaxNoOfPodsToEvictPerNode). @@ -155,6 +159,9 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche WithDryRun(rs.DryRun). WithMetricsEnabled(!rs.DisableMetrics), ) + if err != nil { + return nil, err + } return &descheduler{ rs: rs, @@ -223,7 +230,7 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) d.runProfiles(ctx, client, nodes) - klog.V(1).InfoS("Number of evicted pods", "totalEvicted", d.podEvictor.TotalEvicted()) + klog.V(1).InfoS("Number of evictions/requests", "totalEvicted", d.podEvictor.TotalEvicted(), "evictionRequests", d.podEvictor.TotalEvictionRequests()) return nil } @@ -404,7 +411,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient) defer eventBroadcaster.Shutdown() - descheduler, err := newDescheduler(rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory) + descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory) if err != nil { span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error()))) return err @@ -414,6 +421,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) + descheduler.podEvictor.WaitForEventHandlersSync(ctx) wait.NonSlidingUntil(func() { // A next context is created here intentionally to avoid nesting the spans via context. diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 76bbf2e96..22f8f7cb3 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -2,12 +2,16 @@ package descheduler import ( "context" + "errors" "fmt" + "math/rand" + "net/http" "testing" "time" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" apiversion "k8s.io/apimachinery/pkg/version" @@ -15,10 +19,13 @@ import ( "k8s.io/client-go/informers" fakeclientset "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/component-base/featuregate" "k8s.io/klog/v2" utilptr "k8s.io/utils/ptr" "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/pkg/descheduler/evictions" + "sigs.k8s.io/descheduler/pkg/features" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" "sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates" @@ -28,6 +35,26 @@ import ( "sigs.k8s.io/descheduler/test" ) +var ( + podEvictionError = errors.New("PodEvictionError") + tooManyRequestsError = &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusTooManyRequests, + Reason: metav1.StatusReasonTooManyRequests, + Message: "admission webhook \"virt-launcher-eviction-interceptor.kubevirt.io\" denied the request: Eviction triggered evacuation of VMI", + }, + } +) + +func initFeatureGates() featuregate.FeatureGate { + featureGates := featuregate.NewFeatureGate() + featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ + features.EvictionsInBackground: {Default: false, PreRelease: featuregate.Alpha}, + }) + return featureGates +} + func initPluginRegistry() { pluginregistry.PluginRegistry = pluginregistry.NewRegistry() pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry) @@ -99,7 +126,7 @@ func removeDuplicatesPolicy() *api.DeschedulerPolicy { } } -func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { +func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) { client := fakeclientset.NewSimpleClientset(objects...) eventClient := fakeclientset.NewSimpleClientset(objects...) @@ -109,11 +136,12 @@ func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolic } rs.Client = client rs.EventClient = eventClient + rs.DefaultFeatureGates = featureGates sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) - descheduler, err := newDescheduler(rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory) + descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory) if err != nil { eventBroadcaster.Shutdown() t.Fatalf("Unable to create a descheduler instance: %v", err) @@ -144,6 +172,7 @@ func TestTaintsUpdated(t *testing.T) { } rs.Client = client rs.EventClient = eventClient + rs.DefaultFeatureGates = initFeatureGates() pods, err := client.CoreV1().Pods(p1.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { @@ -167,7 +196,7 @@ func TestTaintsUpdated(t *testing.T) { } var evictedPods []string - client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods)) + client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil)) if err := RunDeschedulerStrategies(ctx, rs, removePodsViolatingNodeTaintsPolicy(), "v1"); err != nil { t.Fatalf("Unable to run descheduler strategies: %v", err) @@ -206,6 +235,7 @@ func TestDuplicate(t *testing.T) { } rs.Client = client rs.EventClient = eventClient + rs.DefaultFeatureGates = initFeatureGates() pods, err := client.CoreV1().Pods(p1.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { @@ -217,7 +247,7 @@ func TestDuplicate(t *testing.T) { } var evictedPods []string - client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods)) + client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil)) if err := RunDeschedulerStrategies(ctx, rs, removeDuplicatesPolicy(), "v1"); err != nil { t.Fatalf("Unable to run descheduler strategies: %v", err) @@ -245,6 +275,7 @@ func TestRootCancel(t *testing.T) { rs.Client = client rs.EventClient = eventClient rs.DeschedulingInterval = 100 * time.Millisecond + rs.DefaultFeatureGates = initFeatureGates() errChan := make(chan error, 1) defer close(errChan) @@ -280,6 +311,7 @@ func TestRootCancelWithNoInterval(t *testing.T) { rs.Client = client rs.EventClient = eventClient rs.DeschedulingInterval = 0 + rs.DefaultFeatureGates = initFeatureGates() errChan := make(chan error, 1) defer close(errChan) @@ -358,7 +390,7 @@ func TestValidateVersionCompatibility(t *testing.T) { } } -func podEvictionReactionTestingFnc(evictedPods *[]string) func(action core.Action) (bool, runtime.Object, error) { +func podEvictionReactionTestingFnc(evictedPods *[]string, isEvictionsInBackground func(podName string) bool, evictionErr error) func(action core.Action) (bool, runtime.Object, error) { return func(action core.Action) (bool, runtime.Object, error) { if action.GetSubresource() == "eviction" { createAct, matched := action.(core.CreateActionImpl) @@ -366,7 +398,14 @@ func podEvictionReactionTestingFnc(evictedPods *[]string) func(action core.Actio return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl") } if eviction, matched := createAct.Object.(*policy.Eviction); matched { + if isEvictionsInBackground != nil && isEvictionsInBackground(eviction.GetName()) { + return true, nil, tooManyRequestsError + } + if evictionErr != nil { + return true, nil, evictionErr + } *evictedPods = append(*evictedPods, eviction.GetName()) + return true, nil, nil } } return false, nil, nil // fallback to the default reactor @@ -402,15 +441,15 @@ func TestPodEvictorReset(t *testing.T) { internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy() ctxCancel, cancel := context.WithCancel(ctx) - rs, descheduler, client := initDescheduler(t, ctxCancel, internalDeschedulerPolicy, node1, node2, p1, p2) + rs, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, node1, node2, p1, p2) defer cancel() var evictedPods []string - client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods)) + client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, nil, nil)) var fakeEvictedPods []string descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) { - return podEvictionReactionTestingFnc(&fakeEvictedPods) + return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, nil) } // a single pod eviction expected @@ -453,6 +492,138 @@ func TestPodEvictorReset(t *testing.T) { } } +func checkTotals(t *testing.T, ctx context.Context, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) { + if total := descheduler.podEvictor.TotalEvictionRequests(); total != totalEvictionRequests { + t.Fatalf("Expected %v total eviction requests, got %v instead", totalEvictionRequests, total) + } + if total := descheduler.podEvictor.TotalEvicted(); total != totalEvicted { + t.Fatalf("Expected %v total evictions, got %v instead", totalEvicted, total) + } + t.Logf("Total evictions: %v, total eviction requests: %v, total evictions and eviction requests: %v", totalEvicted, totalEvictionRequests, totalEvicted+totalEvictionRequests) +} + +func runDeschedulingCycleAndCheckTotals(t *testing.T, ctx context.Context, nodes []*v1.Node, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) { + err := descheduler.runDeschedulerLoop(ctx, nodes) + if err != nil { + t.Fatalf("Unable to run a descheduling loop: %v", err) + } + checkTotals(t, ctx, descheduler, totalEvictionRequests, totalEvicted) +} + +func TestEvictionRequestsCache(t *testing.T) { + initPluginRegistry() + + ctx := context.Background() + node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule) + node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + nodes := []*v1.Node{node1, node2} + + ownerRef1 := test.GetReplicaSetOwnerRefList() + updatePod := func(pod *v1.Pod) { + pod.Namespace = "dev" + pod.ObjectMeta.OwnerReferences = ownerRef1 + pod.Status.Phase = v1.PodRunning + } + updatePodWithEvictionInBackground := func(pod *v1.Pod) { + updatePod(pod) + pod.Annotations = map[string]string{ + evictions.EvictionRequestAnnotationKey: "", + } + } + + p1 := test.BuildTestPod("p1", 100, 0, node1.Name, updatePodWithEvictionInBackground) + p2 := test.BuildTestPod("p2", 100, 0, node1.Name, updatePodWithEvictionInBackground) + p3 := test.BuildTestPod("p3", 100, 0, node1.Name, updatePod) + p4 := test.BuildTestPod("p4", 100, 0, node1.Name, updatePod) + p5 := test.BuildTestPod("p5", 100, 0, node1.Name, updatePod) + + internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy() + ctxCancel, cancel := context.WithCancel(ctx) + featureGates := featuregate.NewFeatureGate() + featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ + features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha}, + }) + _, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, node1, node2, p1, p2, p3, p4) + defer cancel() + + var fakeEvictedPods []string + descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) { + return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, podEvictionError) + } + + var evictedPods []string + client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, func(name string) bool { return name == "p1" || name == "p2" }, nil)) + + klog.Infof("2 evictions in background expected, 2 normal evictions") + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2) + + klog.Infof("Repeat the same as previously to confirm no more evictions in background are requested") + // No evicted pod is actually deleted on purpose so the test can run the descheduling cycle repeatedly + // without recreating the pods. + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2) + + klog.Infof("Scenario: Eviction in background got initiated") + p2.Annotations[evictions.EvictionInProgressAnnotationKey] = "" + if _, err := client.CoreV1().Pods(p2.Namespace).Update(context.TODO(), p2, metav1.UpdateOptions{}); err != nil { + t.Fatalf("unable to update a pod: %v", err) + } + time.Sleep(100 * time.Millisecond) + + klog.Infof("Repeat the same as previously to confirm no more evictions in background are requested") + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2) + + klog.Infof("Scenario: Another eviction in background got initiated") + p1.Annotations[evictions.EvictionInProgressAnnotationKey] = "" + if _, err := client.CoreV1().Pods(p1.Namespace).Update(context.TODO(), p1, metav1.UpdateOptions{}); err != nil { + t.Fatalf("unable to update a pod: %v", err) + } + time.Sleep(100 * time.Millisecond) + + klog.Infof("Repeat the same as previously to confirm no more evictions in background are requested") + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2) + + klog.Infof("Scenario: Eviction in background completed") + if err := client.CoreV1().Pods(p1.Namespace).Delete(context.TODO(), p1.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("unable to delete a pod: %v", err) + } + time.Sleep(100 * time.Millisecond) + + klog.Infof("Check the number of evictions in background decreased") + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 1, 2) + + klog.Infof("Scenario: A new pod without eviction in background added") + if _, err := client.CoreV1().Pods(p5.Namespace).Create(context.TODO(), p5, metav1.CreateOptions{}); err != nil { + t.Fatalf("unable to create a pod: %v", err) + } + time.Sleep(100 * time.Millisecond) + + klog.Infof("Check the number of evictions increased after running a descheduling cycle") + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 1, 3) + + klog.Infof("Scenario: Eviction in background canceled => eviction in progress annotation removed") + delete(p2.Annotations, evictions.EvictionInProgressAnnotationKey) + if _, err := client.CoreV1().Pods(p2.Namespace).Update(context.TODO(), p2, metav1.UpdateOptions{}); err != nil { + t.Fatalf("unable to update a pod: %v", err) + } + time.Sleep(100 * time.Millisecond) + + klog.Infof("Check the number of evictions in background decreased") + checkTotals(t, ctx, descheduler, 0, 3) + + klog.Infof("Scenario: Re-run the descheduling cycle to re-request eviction in background") + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 1, 3) + + klog.Infof("Scenario: Eviction in background completed with a pod in completed state") + p2.Status.Phase = v1.PodSucceeded + if _, err := client.CoreV1().Pods(p2.Namespace).Update(context.TODO(), p2, metav1.UpdateOptions{}); err != nil { + t.Fatalf("unable to delete a pod: %v", err) + } + time.Sleep(100 * time.Millisecond) + + klog.Infof("Check the number of evictions in background decreased") + runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 0, 3) +} + func TestDeschedulingLimits(t *testing.T) { initPluginRegistry() @@ -496,6 +667,13 @@ func TestDeschedulingLimits(t *testing.T) { pod.ObjectMeta.OwnerReferences = ownerRef1 } + updatePodWithEvictionInBackground := func(pod *v1.Pod) { + updatePod(pod) + pod.Annotations = map[string]string{ + evictions.EvictionRequestAnnotationKey: "", + } + } + for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { ctx := context.Background() @@ -503,39 +681,59 @@ func TestDeschedulingLimits(t *testing.T) { node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) nodes := []*v1.Node{node1, node2} ctxCancel, cancel := context.WithCancel(ctx) - _, descheduler, client := initDescheduler(t, ctxCancel, tc.policy, node1, node2) + featureGates := featuregate.NewFeatureGate() + featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ + features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha}, + }) + _, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, node1, node2) defer cancel() + var fakeEvictedPods []string + descheduler.podEvictionReactionFnc = func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) { + return podEvictionReactionTestingFnc(&fakeEvictedPods, nil, podEvictionError) + } + + var evictedPods []string + client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, func(name string) bool { return name == "p1" || name == "p2" }, nil)) + + rand.Seed(time.Now().UnixNano()) pods := []*v1.Pod{ - test.BuildTestPod("p1", 100, 0, node1.Name, updatePod), - test.BuildTestPod("p2", 100, 0, node1.Name, updatePod), + test.BuildTestPod("p1", 100, 0, node1.Name, updatePodWithEvictionInBackground), + test.BuildTestPod("p2", 100, 0, node1.Name, updatePodWithEvictionInBackground), test.BuildTestPod("p3", 100, 0, node1.Name, updatePod), test.BuildTestPod("p4", 100, 0, node1.Name, updatePod), test.BuildTestPod("p5", 100, 0, node1.Name, updatePod), } - for j := 0; j < 5; j++ { - idx := j - if _, err := client.CoreV1().Pods(pods[idx].Namespace).Create(context.TODO(), pods[idx], metav1.CreateOptions{}); err != nil { - t.Fatalf("unable to create a pod: %v", err) - } - defer func() { - if err := client.CoreV1().Pods(pods[idx].Namespace).Delete(context.TODO(), pods[idx].Name, metav1.DeleteOptions{}); err != nil { - t.Fatalf("unable to delete a pod: %v", err) + for i := 0; i < 10; i++ { + rand.Shuffle(len(pods), func(i, j int) { pods[i], pods[j] = pods[j], pods[i] }) + func() { + for j := 0; j < 5; j++ { + idx := j + if _, err := client.CoreV1().Pods(pods[idx].Namespace).Create(context.TODO(), pods[idx], metav1.CreateOptions{}); err != nil { + t.Fatalf("unable to create a pod: %v", err) + } + defer func() { + if err := client.CoreV1().Pods(pods[idx].Namespace).Delete(context.TODO(), pods[idx].Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("unable to delete a pod: %v", err) + } + }() } + time.Sleep(100 * time.Millisecond) + + klog.Infof("2 evictions in background expected, 2 normal evictions") + err := descheduler.runDeschedulerLoop(ctx, nodes) + if err != nil { + t.Fatalf("Unable to run a descheduling loop: %v", err) + } + totalERs := descheduler.podEvictor.TotalEvictionRequests() + totalEs := descheduler.podEvictor.TotalEvicted() + if totalERs+totalEs > tc.limit { + t.Fatalf("Expected %v evictions and eviction requests in total, got %v instead", tc.limit, totalERs+totalEs) + } + t.Logf("Total evictions and eviction requests: %v (er=%v, e=%v)", totalERs+totalEs, totalERs, totalEs) }() } - time.Sleep(100 * time.Millisecond) - - err := descheduler.runDeschedulerLoop(ctx, nodes) - if err != nil { - t.Fatalf("Unable to run a descheduling loop: %v", err) - } - totalEs := descheduler.podEvictor.TotalEvicted() - if totalEs > tc.limit { - t.Fatalf("Expected %v evictions in total, got %v instead", tc.limit, totalEs) - } - t.Logf("Total evictions: %v", totalEs) }) } } diff --git a/pkg/descheduler/evictions/evictions.go b/pkg/descheduler/evictions/evictions.go index 04b242702..b5857ba9d 100644 --- a/pkg/descheduler/evictions/evictions.go +++ b/pkg/descheduler/evictions/evictions.go @@ -19,7 +19,9 @@ package evictions import ( "context" "fmt" + "strings" "sync" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -27,15 +29,176 @@ import ( policy "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" + "k8s.io/component-base/featuregate" "k8s.io/klog/v2" - "sigs.k8s.io/descheduler/metrics" + "sigs.k8s.io/descheduler/metrics" eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" + "sigs.k8s.io/descheduler/pkg/features" "sigs.k8s.io/descheduler/pkg/tracing" ) +var ( + assumedEvictionRequestTimeoutSeconds uint = 10 * 60 // 10 minutes + evictionRequestsCacheResyncPeriod time.Duration = 10 * time.Minute + // syncedPollPeriod controls how often you look at the status of your sync funcs + syncedPollPeriod = 100 * time.Millisecond +) + +type evictionRequestItem struct { + podName, podNamespace, podNodeName string + evictionAssumed bool + assumedTimestamp metav1.Time +} + +type evictionRequestsCache struct { + mu sync.RWMutex + requests map[string]evictionRequestItem + requestsPerNode map[string]uint + requestsPerNamespace map[string]uint + requestsTotal uint + assumedRequestTimeoutSeconds uint +} + +func newEvictionRequestsCache(assumedRequestTimeoutSeconds uint) *evictionRequestsCache { + return &evictionRequestsCache{ + requests: make(map[string]evictionRequestItem), + requestsPerNode: make(map[string]uint), + requestsPerNamespace: make(map[string]uint), + assumedRequestTimeoutSeconds: assumedRequestTimeoutSeconds, + } +} + +func (erc *evictionRequestsCache) run(ctx context.Context) { + wait.UntilWithContext(ctx, erc.cleanCache, evictionRequestsCacheResyncPeriod) +} + +// cleanCache removes all assumed entries that has not been confirmed +// for more than a specified timeout +func (erc *evictionRequestsCache) cleanCache(ctx context.Context) { + erc.mu.Lock() + defer erc.mu.Unlock() + klog.V(4).Infof("Cleaning cache of assumed eviction requests in background") + for uid, item := range erc.requests { + if item.evictionAssumed { + requestAgeSeconds := uint(metav1.Now().Sub(item.assumedTimestamp.Local()).Seconds()) + if requestAgeSeconds > erc.assumedRequestTimeoutSeconds { + klog.V(4).InfoS("Assumed eviction request in background timed out, deleting", "timeout", erc.assumedRequestTimeoutSeconds, "podNamespace", item.podNamespace, "podName", item.podName) + erc.deleteItem(uid) + } + } + } +} + +func (erc *evictionRequestsCache) evictionRequestsPerNode(nodeName string) uint { + erc.mu.RLock() + defer erc.mu.RUnlock() + return erc.requestsPerNode[nodeName] +} + +func (erc *evictionRequestsCache) evictionRequestsPerNamespace(ns string) uint { + erc.mu.RLock() + defer erc.mu.RUnlock() + return erc.requestsPerNamespace[ns] +} + +func (erc *evictionRequestsCache) evictionRequestsTotal() uint { + erc.mu.RLock() + defer erc.mu.RUnlock() + return erc.requestsTotal +} + +func (erc *evictionRequestsCache) TotalEvictionRequests() uint { + erc.mu.RLock() + defer erc.mu.RUnlock() + return uint(len(erc.requests)) +} + +// getPodKey returns the string key of a pod. +func getPodKey(pod *v1.Pod) string { + uid := string(pod.UID) + // Every pod is expected to have the UID set. + // When the descheduling framework is used for simulation + // user created workload may forget to set the UID. + if len(uid) == 0 { + panic(fmt.Errorf("cannot get cache key for %v/%v pod with empty UID", pod.Namespace, pod.Name)) + } + return uid +} + +func (erc *evictionRequestsCache) addPod(pod *v1.Pod) { + erc.mu.Lock() + defer erc.mu.Unlock() + uid := getPodKey(pod) + if _, exists := erc.requests[uid]; exists { + return + } + erc.requests[uid] = evictionRequestItem{podNamespace: pod.Namespace, podName: pod.Name, podNodeName: pod.Spec.NodeName} + erc.requestsPerNode[pod.Spec.NodeName]++ + erc.requestsPerNamespace[pod.Namespace]++ + erc.requestsTotal++ +} + +func (erc *evictionRequestsCache) assumePod(pod *v1.Pod) { + erc.mu.Lock() + defer erc.mu.Unlock() + uid := getPodKey(pod) + if _, exists := erc.requests[uid]; exists { + return + } + erc.requests[uid] = evictionRequestItem{ + podNamespace: pod.Namespace, + podName: pod.Name, + podNodeName: pod.Spec.NodeName, + evictionAssumed: true, + assumedTimestamp: metav1.NewTime(time.Now()), + } + erc.requestsPerNode[pod.Spec.NodeName]++ + erc.requestsPerNamespace[pod.Namespace]++ + erc.requestsTotal++ +} + +// no locking, expected to be invoked from protected methods only +func (erc *evictionRequestsCache) deleteItem(uid string) { + erc.requestsPerNode[erc.requests[uid].podNodeName]-- + if erc.requestsPerNode[erc.requests[uid].podNodeName] == 0 { + delete(erc.requestsPerNode, erc.requests[uid].podNodeName) + } + erc.requestsPerNamespace[erc.requests[uid].podNamespace]-- + if erc.requestsPerNamespace[erc.requests[uid].podNamespace] == 0 { + delete(erc.requestsPerNamespace, erc.requests[uid].podNamespace) + } + erc.requestsTotal-- + delete(erc.requests, uid) +} + +func (erc *evictionRequestsCache) deletePod(pod *v1.Pod) { + erc.mu.Lock() + defer erc.mu.Unlock() + uid := getPodKey(pod) + if _, exists := erc.requests[uid]; exists { + erc.deleteItem(uid) + } +} + +func (erc *evictionRequestsCache) hasPod(pod *v1.Pod) bool { + erc.mu.RLock() + defer erc.mu.RUnlock() + uid := getPodKey(pod) + _, exists := erc.requests[uid] + return exists +} + +var ( + EvictionRequestAnnotationKey = "descheduler.alpha.kubernetes.io/request-evict-only" + EvictionInProgressAnnotationKey = "descheduler.alpha.kubernetes.io/eviction-in-progress" + EvictionInBackgroundErrorText = "Eviction triggered evacuation" +) + // nodePodEvictedCount keeps count of pods evicted on node type ( nodePodEvictedCount map[string]uint @@ -43,7 +206,7 @@ type ( ) type PodEvictor struct { - mu sync.Mutex + mu sync.RWMutex client clientset.Interface policyGroupVersion string dryRun bool @@ -55,18 +218,26 @@ type PodEvictor struct { totalPodCount uint metricsEnabled bool eventRecorder events.EventRecorder + erCache *evictionRequestsCache + featureGates featuregate.FeatureGate + + // registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start. + registeredHandlers []cache.ResourceEventHandlerRegistration } func NewPodEvictor( + ctx context.Context, client clientset.Interface, eventRecorder events.EventRecorder, + podInformer cache.SharedIndexInformer, + featureGates featuregate.FeatureGate, options *Options, -) *PodEvictor { +) (*PodEvictor, error) { if options == nil { options = NewOptions() } - return &PodEvictor{ + podEvictor := &PodEvictor{ client: client, eventRecorder: eventRecorder, policyGroupVersion: options.policyGroupVersion, @@ -77,20 +248,140 @@ func NewPodEvictor( metricsEnabled: options.metricsEnabled, nodePodCount: make(nodePodEvictedCount), namespacePodCount: make(namespacePodEvictCount), + featureGates: featureGates, } + + if featureGates.Enabled(features.EvictionsInBackground) { + erCache := newEvictionRequestsCache(assumedEvictionRequestTimeoutSeconds) + + handlerRegistration, err := podInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj) + return + } + if _, exists := pod.Annotations[EvictionRequestAnnotationKey]; exists { + if _, exists := pod.Annotations[EvictionInProgressAnnotationKey]; exists { + // Ignore completed/suceeeded or failed pods + if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed { + klog.V(3).InfoS("Eviction in background detected. Adding pod to the cache.", "pod", klog.KObj(pod)) + erCache.addPod(pod) + } + } + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldPod, ok := oldObj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj) + return + } + newPod, ok := newObj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj) + return + } + // Ignore pod's that are not subject to an eviction in background + if _, exists := newPod.Annotations[EvictionRequestAnnotationKey]; !exists { + if erCache.hasPod(newPod) { + klog.V(3).InfoS("Pod with eviction in background lost annotation. Removing pod from the cache.", "pod", klog.KObj(newPod)) + } + erCache.deletePod(newPod) + return + } + // Remove completed/suceeeded or failed pods from the cache + if newPod.Status.Phase == v1.PodSucceeded || newPod.Status.Phase == v1.PodFailed { + klog.V(3).InfoS("Pod with eviction in background completed. Removing pod from the cache.", "pod", klog.KObj(newPod)) + erCache.deletePod(newPod) + return + } + // Ignore any pod that does not have eviction in progress + if _, exists := newPod.Annotations[EvictionInProgressAnnotationKey]; !exists { + // In case EvictionInProgressAnnotationKey annotation is not present/removed + // it's unclear whether the eviction was restarted or terminated. + // If the eviction gets restarted the pod needs to be removed from the cache + // to allow re-triggering the eviction. + if _, exists := oldPod.Annotations[EvictionInProgressAnnotationKey]; !exists { + return + } + // the annotation was removed -> remove the pod from the cache to allow to + // request for eviction again. In case the eviction got restarted requesting + // the eviction again is expected to be a no-op. In case the eviction + // got terminated with no-retry, requesting a new eviction is a normal + // operation. + klog.V(3).InfoS("Eviction in background canceled (annotation removed). Removing pod from the cache.", "annotation", EvictionInProgressAnnotationKey, "pod", klog.KObj(newPod)) + erCache.deletePod(newPod) + return + } + // Pick up the eviction in progress + if !erCache.hasPod(newPod) { + klog.V(3).InfoS("Eviction in background detected. Updating the cache.", "pod", klog.KObj(newPod)) + } + erCache.addPod(newPod) + }, + DeleteFunc: func(obj interface{}) { + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj) + return + } + default: + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t) + return + } + if erCache.hasPod(pod) { + klog.V(3).InfoS("Pod with eviction in background deleted/evicted. Removing pod from the cache.", "pod", klog.KObj(pod)) + } + erCache.deletePod(pod) + }, + }, + ) + if err != nil { + return nil, fmt.Errorf("unable to register event handler for pod evictor: %v", err) + } + + podEvictor.registeredHandlers = append(podEvictor.registeredHandlers, handlerRegistration) + + go erCache.run(ctx) + + podEvictor.erCache = erCache + } + + return podEvictor, nil +} + +// WaitForEventHandlersSync waits for EventHandlers to sync. +// It returns true if it was successful, false if the controller should shut down +func (pe *PodEvictor) WaitForEventHandlersSync(ctx context.Context) error { + return wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (done bool, err error) { + for _, handler := range pe.registeredHandlers { + if !handler.HasSynced() { + return false, nil + } + } + return true, nil + }) } // NodeEvicted gives a number of pods evicted for node func (pe *PodEvictor) NodeEvicted(node *v1.Node) uint { - pe.mu.Lock() - defer pe.mu.Unlock() + pe.mu.RLock() + defer pe.mu.RUnlock() return pe.nodePodCount[node.Name] } // TotalEvicted gives a number of pods evicted through all nodes func (pe *PodEvictor) TotalEvicted() uint { - pe.mu.Lock() - defer pe.mu.Unlock() + pe.mu.RLock() + defer pe.mu.RUnlock() return pe.totalPodCount } @@ -108,6 +399,46 @@ func (pe *PodEvictor) SetClient(client clientset.Interface) { pe.client = client } +func (pe *PodEvictor) evictionRequestsTotal() uint { + if pe.featureGates.Enabled(features.EvictionsInBackground) { + return pe.erCache.evictionRequestsTotal() + } else { + return 0 + } +} + +func (pe *PodEvictor) evictionRequestsPerNode(node string) uint { + if pe.featureGates.Enabled(features.EvictionsInBackground) { + return pe.erCache.evictionRequestsPerNode(node) + } else { + return 0 + } +} + +func (pe *PodEvictor) evictionRequestsPerNamespace(ns string) uint { + if pe.featureGates.Enabled(features.EvictionsInBackground) { + return pe.erCache.evictionRequestsPerNamespace(ns) + } else { + return 0 + } +} + +func (pe *PodEvictor) EvictionRequests(node *v1.Node) uint { + pe.mu.RLock() + defer pe.mu.RUnlock() + return pe.evictionRequestsTotal() +} + +func (pe *PodEvictor) TotalEvictionRequests() uint { + pe.mu.RLock() + defer pe.mu.RUnlock() + if pe.featureGates.Enabled(features.EvictionsInBackground) { + return pe.erCache.TotalEvictionRequests() + } else { + return 0 + } +} + // EvictOptions provides a handle for passing additional info to EvictPod type EvictOptions struct { // Reason allows for passing details about the specific eviction for logging. @@ -121,13 +452,29 @@ type EvictOptions struct { // EvictPod evicts a pod while exercising eviction limits. // Returns true when the pod is evicted on the server side. func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptions) error { + if len(pod.UID) == 0 { + klog.InfoS("Ignoring pod eviction due to missing UID", "pod", pod) + return fmt.Errorf("Pod %v is missing UID", klog.KObj(pod)) + } + + if pe.featureGates.Enabled(features.EvictionsInBackground) { + // eviction in background requested + if _, exists := pod.Annotations[EvictionRequestAnnotationKey]; exists { + if pe.erCache.hasPod(pod) { + klog.V(3).InfoS("Eviction in background already requested (ignoring)", "pod", klog.KObj(pod)) + return nil + } + } + } + pe.mu.Lock() defer pe.mu.Unlock() + var span trace.Span ctx, span = tracing.Tracer().Start(ctx, "EvictPod", trace.WithAttributes(attribute.String("podName", pod.Name), attribute.String("podNamespace", pod.Namespace), attribute.String("reason", opts.Reason), attribute.String("operation", tracing.EvictOperation))) defer span.End() - if pe.maxPodsToEvictTotal != nil && pe.totalPodCount+1 > *pe.maxPodsToEvictTotal { + if pe.maxPodsToEvictTotal != nil && pe.totalPodCount+pe.evictionRequestsTotal()+1 > *pe.maxPodsToEvictTotal { err := NewEvictionTotalLimitError() if pe.metricsEnabled { metrics.PodsEvicted.With(map[string]string{"result": err.Error(), "strategy": opts.StrategyName, "namespace": pod.Namespace, "node": pod.Spec.NodeName, "profile": opts.ProfileName}).Inc() @@ -138,7 +485,7 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio } if pod.Spec.NodeName != "" { - if pe.maxPodsToEvictPerNode != nil && pe.nodePodCount[pod.Spec.NodeName]+1 > *pe.maxPodsToEvictPerNode { + if pe.maxPodsToEvictPerNode != nil && pe.nodePodCount[pod.Spec.NodeName]+pe.evictionRequestsPerNode(pod.Spec.NodeName)+1 > *pe.maxPodsToEvictPerNode { err := NewEvictionNodeLimitError(pod.Spec.NodeName) if pe.metricsEnabled { metrics.PodsEvicted.With(map[string]string{"result": err.Error(), "strategy": opts.StrategyName, "namespace": pod.Namespace, "node": pod.Spec.NodeName, "profile": opts.ProfileName}).Inc() @@ -149,17 +496,17 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio } } - if pe.maxPodsToEvictPerNamespace != nil && pe.namespacePodCount[pod.Namespace]+1 > *pe.maxPodsToEvictPerNamespace { + if pe.maxPodsToEvictPerNamespace != nil && pe.namespacePodCount[pod.Namespace]+pe.evictionRequestsPerNamespace(pod.Namespace)+1 > *pe.maxPodsToEvictPerNamespace { err := NewEvictionNamespaceLimitError(pod.Namespace) if pe.metricsEnabled { metrics.PodsEvicted.With(map[string]string{"result": err.Error(), "strategy": opts.StrategyName, "namespace": pod.Namespace, "node": pod.Spec.NodeName, "profile": opts.ProfileName}).Inc() } span.AddEvent("Eviction Failed", trace.WithAttributes(attribute.String("node", pod.Spec.NodeName), attribute.String("err", err.Error()))) - klog.ErrorS(err, "Error evicting pod", "limit", *pe.maxPodsToEvictPerNamespace, "namespace", pod.Namespace) + klog.ErrorS(err, "Error evicting pod", "limit", *pe.maxPodsToEvictPerNamespace, "namespace", pod.Namespace, "pod", klog.KObj(pod)) return err } - err := evictPod(ctx, pe.client, pod, pe.policyGroupVersion) + ignore, err := pe.evictPod(ctx, pod) if err != nil { // err is used only for logging purposes span.AddEvent("Eviction Failed", trace.WithAttributes(attribute.String("node", pod.Spec.NodeName), attribute.String("err", err.Error()))) @@ -170,6 +517,10 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio return err } + if ignore { + return nil + } + if pod.Spec.NodeName != "" { pe.nodePodCount[pod.Spec.NodeName]++ } @@ -196,12 +547,13 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio return nil } -func evictPod(ctx context.Context, client clientset.Interface, pod *v1.Pod, policyGroupVersion string) error { +// return (ignore, err) +func (pe *PodEvictor) evictPod(ctx context.Context, pod *v1.Pod) (bool, error) { deleteOptions := &metav1.DeleteOptions{} // GracePeriodSeconds ? eviction := &policy.Eviction{ TypeMeta: metav1.TypeMeta{ - APIVersion: policyGroupVersion, + APIVersion: pe.policyGroupVersion, Kind: eutils.EvictionKind, }, ObjectMeta: metav1.ObjectMeta{ @@ -210,13 +562,36 @@ func evictPod(ctx context.Context, client clientset.Interface, pod *v1.Pod, poli }, DeleteOptions: deleteOptions, } - err := client.PolicyV1().Evictions(eviction.Namespace).Evict(ctx, eviction) + err := pe.client.PolicyV1().Evictions(eviction.Namespace).Evict(ctx, eviction) + if err == nil { + return false, nil + } + if pe.featureGates.Enabled(features.EvictionsInBackground) { + // eviction in background requested + if _, exists := pod.Annotations[EvictionRequestAnnotationKey]; exists { + // Simulating https://github.com/kubevirt/kubevirt/pull/11532/files#diff-059cc1fc09e8b469143348cc3aa80b40de987670e008fa18a6fe010061f973c9R77 + if apierrors.IsTooManyRequests(err) && strings.Contains(err.Error(), EvictionInBackgroundErrorText) { + // Ignore eviction of any pod that's failed or completed. + // It can happen an eviction in background ends up with the pod stuck in the completed state. + // Normally, any request eviction is expected to end with the pod deletion. + // However, some custom eviction policies may end up with completed pods around. + // Which leads to all the completed pods to be considered still as unfinished evictions in background. + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + klog.V(3).InfoS("Ignoring eviction of a completed/failed pod", "pod", klog.KObj(pod)) + return true, nil + } + klog.V(3).InfoS("Eviction in background assumed", "pod", klog.KObj(pod)) + pe.erCache.assumePod(pod) + return true, nil + } + } + } if apierrors.IsTooManyRequests(err) { - return fmt.Errorf("error when evicting pod (ignoring) %q: %v", pod.Name, err) + return false, fmt.Errorf("error when evicting pod (ignoring) %q: %v", pod.Name, err) } if apierrors.IsNotFound(err) { - return fmt.Errorf("pod not found when evicting %q: %v", pod.Name, err) + return false, fmt.Errorf("pod not found when evicting %q: %v", pod.Name, err) } - return err + return false, err } diff --git a/pkg/descheduler/evictions/evictions_test.go b/pkg/descheduler/evictions/evictions_test.go index 475b0bc9d..6b72005ac 100644 --- a/pkg/descheduler/evictions/evictions_test.go +++ b/pkg/descheduler/evictions/evictions_test.go @@ -18,23 +18,40 @@ package evictions import ( "context" + "fmt" "testing" + "time" v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + fakeclientset "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" + "k8s.io/component-base/featuregate" + "k8s.io/klog/v2" utilptr "k8s.io/utils/ptr" + podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/pkg/features" "sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/test" ) +func initFeatureGates() featuregate.FeatureGate { + featureGates := featuregate.NewFeatureGate() + featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ + features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha}, + }) + return featureGates +} + func TestEvictPod(t *testing.T) { - ctx := context.Background() node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil) pod1 := test.BuildTestPod("p1", 400, 0, "node1", nil) tests := []struct { @@ -61,14 +78,34 @@ func TestEvictPod(t *testing.T) { } for _, test := range tests { - fakeClient := &fake.Clientset{} - fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) { - return true, &v1.PodList{Items: test.pods}, nil + t.Run(test.description, func(t *testing.T) { + ctx := context.Background() + fakeClient := &fake.Clientset{} + fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) { + return true, &v1.PodList{Items: test.pods}, nil + }) + sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + eventRecorder := &events.FakeRecorder{} + podEvictor, err := NewPodEvictor( + ctx, + fakeClient, + eventRecorder, + sharedInformerFactory.Core().V1().Pods().Informer(), + initFeatureGates(), + NewOptions(), + ) + if err != nil { + t.Fatalf("Unexpected error when creating a pod evictor: %v", err) + } + + _, got := podEvictor.evictPod(ctx, test.pod) + if got != test.want { + t.Errorf("Test error for Desc: %s. Expected %v pod eviction to be %v, got %v", test.description, test.pod.Name, test.want, got) + } }) - got := evictPod(ctx, fakeClient, test.pod, "v1") - if got != test.want { - t.Errorf("Test error for Desc: %s. Expected %v pod eviction to be %v, got %v", test.description, test.pod.Name, test.want, got) - } } } @@ -118,17 +155,29 @@ func TestPodTypes(t *testing.T) { } func TestNewPodEvictor(t *testing.T) { + ctx := context.Background() + pod1 := test.BuildTestPod("pod", 400, 0, "node", nil) fakeClient := fake.NewSimpleClientset(pod1) + sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + eventRecorder := &events.FakeRecorder{} - podEvictor := NewPodEvictor( + podEvictor, err := NewPodEvictor( + ctx, fakeClient, eventRecorder, + sharedInformerFactory.Core().V1().Pods().Informer(), + initFeatureGates(), NewOptions().WithMaxPodsToEvictPerNode(utilptr.To[uint](1)), ) + if err != nil { + t.Fatalf("Unexpected error when creating a pod evictor: %v", err) + } stubNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node"}} @@ -154,7 +203,7 @@ func TestNewPodEvictor(t *testing.T) { t.Errorf("Expected 1 total evictions, got %q instead", evictions) } - err := podEvictor.EvictPod(context.TODO(), pod1, EvictOptions{}) + err = podEvictor.EvictPod(context.TODO(), pod1, EvictOptions{}) if err == nil { t.Errorf("Expected a pod eviction error, got nil instead") } @@ -165,3 +214,89 @@ func TestNewPodEvictor(t *testing.T) { t.Errorf("Expected a pod eviction EvictionNodeLimitError error, got a different error instead: %v", err) } } + +func TestEvictionRequestsCacheCleanup(t *testing.T) { + ctx := context.Background() + node1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + + ownerRef1 := test.GetReplicaSetOwnerRefList() + updatePod := func(pod *v1.Pod) { + pod.Namespace = "dev" + pod.ObjectMeta.OwnerReferences = ownerRef1 + } + updatePodWithEvictionInBackground := func(pod *v1.Pod) { + updatePod(pod) + pod.Annotations = map[string]string{ + EvictionRequestAnnotationKey: "", + } + } + + p1 := test.BuildTestPod("p1", 100, 0, node1.Name, updatePodWithEvictionInBackground) + p2 := test.BuildTestPod("p2", 100, 0, node1.Name, updatePodWithEvictionInBackground) + p3 := test.BuildTestPod("p3", 100, 0, node1.Name, updatePod) + p4 := test.BuildTestPod("p4", 100, 0, node1.Name, updatePod) + + client := fakeclientset.NewSimpleClientset(node1, p1, p2, p3, p4) + sharedInformerFactory := informers.NewSharedInformerFactory(client, 0) + _, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) + + podEvictor, err := NewPodEvictor( + ctx, + client, + eventRecorder, + sharedInformerFactory.Core().V1().Pods().Informer(), + initFeatureGates(), + nil, + ) + if err != nil { + t.Fatalf("Unexpected error when creating a pod evictor: %v", err) + } + + client.PrependReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "eviction" { + createAct, matched := action.(core.CreateActionImpl) + if !matched { + return false, nil, fmt.Errorf("unable to convert action to core.CreateActionImpl") + } + if eviction, matched := createAct.Object.(*policy.Eviction); matched { + podName := eviction.GetName() + if podName == "p1" || podName == "p2" { + return true, nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonTooManyRequests, + Message: "Eviction triggered evacuation", + }, + } + } + return true, nil, nil + } + } + return false, nil, nil + }) + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + podEvictor.EvictPod(ctx, p1, EvictOptions{}) + podEvictor.EvictPod(ctx, p2, EvictOptions{}) + podEvictor.EvictPod(ctx, p3, EvictOptions{}) + podEvictor.EvictPod(ctx, p4, EvictOptions{}) + + klog.Infof("2 evictions in background expected, 2 normal evictions") + if total := podEvictor.TotalEvictionRequests(); total != 2 { + t.Fatalf("Expected %v total eviction requests, got %v instead", 2, total) + } + if total := podEvictor.TotalEvicted(); total != 2 { + t.Fatalf("Expected %v total evictions, got %v instead", 2, total) + } + + klog.Infof("2 evictions in background assumed. Wait for few seconds and check the assumed requests timed out") + time.Sleep(2 * time.Second) + klog.Infof("Checking the assumed requests timed out and were deleted") + // Set the timeout to 1s so the cleaning can be tested + podEvictor.erCache.assumedRequestTimeoutSeconds = 1 + podEvictor.erCache.cleanCache(ctx) + if totalERs := podEvictor.TotalEvictionRequests(); totalERs > 0 { + t.Fatalf("Expected 0 eviction requests, got %v instead", totalERs) + } +} diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index 5cac3ea27..71234c3e6 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -305,7 +305,8 @@ func (d profileImpl) RunDeschedulePlugins(ctx context.Context, nodes []*v1.Node) var span trace.Span ctx, span = tracing.Tracer().Start(ctx, pl.Name(), trace.WithAttributes(attribute.String("plugin", pl.Name()), attribute.String("profile", d.profileName), attribute.String("operation", tracing.DescheduleOperation))) defer span.End() - evicted := d.podEvictor.TotalEvicted() + evictedBeforeDeschedule := d.podEvictor.TotalEvicted() + evictionRequestsBeforeDeschedule := d.podEvictor.TotalEvictionRequests() strategyStart := time.Now() status := pl.Deschedule(ctx, nodes) metrics.DeschedulerStrategyDuration.With(map[string]string{"strategy": pl.Name(), "profile": d.profileName}).Observe(time.Since(strategyStart).Seconds()) @@ -314,7 +315,7 @@ func (d profileImpl) RunDeschedulePlugins(ctx context.Context, nodes []*v1.Node) span.AddEvent("Plugin Execution Failed", trace.WithAttributes(attribute.String("err", status.Err.Error()))) errs = append(errs, fmt.Errorf("plugin %q finished with error: %v", pl.Name(), status.Err)) } - klog.V(1).InfoS("Total number of pods evicted", "extension point", "Deschedule", "evictedPods", d.podEvictor.TotalEvicted()-evicted) + klog.V(1).InfoS("Total number of evictions/requests", "extension point", "Deschedule", "evictedPods", d.podEvictor.TotalEvicted()-evictedBeforeDeschedule, "evictionRequests", d.podEvictor.TotalEvictionRequests()-evictionRequestsBeforeDeschedule) } aggrErr := errors.NewAggregate(errs) @@ -333,7 +334,8 @@ func (d profileImpl) RunBalancePlugins(ctx context.Context, nodes []*v1.Node) *f var span trace.Span ctx, span = tracing.Tracer().Start(ctx, pl.Name(), trace.WithAttributes(attribute.String("plugin", pl.Name()), attribute.String("profile", d.profileName), attribute.String("operation", tracing.BalanceOperation))) defer span.End() - evicted := d.podEvictor.TotalEvicted() + evictedBeforeBalance := d.podEvictor.TotalEvicted() + evictionRequestsBeforeBalance := d.podEvictor.TotalEvictionRequests() strategyStart := time.Now() status := pl.Balance(ctx, nodes) metrics.DeschedulerStrategyDuration.With(map[string]string{"strategy": pl.Name(), "profile": d.profileName}).Observe(time.Since(strategyStart).Seconds()) @@ -342,7 +344,7 @@ func (d profileImpl) RunBalancePlugins(ctx context.Context, nodes []*v1.Node) *f span.AddEvent("Plugin Execution Failed", trace.WithAttributes(attribute.String("err", status.Err.Error()))) errs = append(errs, fmt.Errorf("plugin %q finished with error: %v", pl.Name(), status.Err)) } - klog.V(1).InfoS("Total number of pods evicted", "extension point", "Balance", "evictedPods", d.podEvictor.TotalEvicted()-evicted) + klog.V(1).InfoS("Total number of evictions/requests", "extension point", "Balance", "evictedPods", d.podEvictor.TotalEvicted()-evictedBeforeBalance, "evictionRequests", d.podEvictor.TotalEvictionRequests()-evictionRequestsBeforeBalance) } aggrErr := errors.NewAggregate(errs) diff --git a/pkg/framework/testing/utils.go b/pkg/framework/testing/utils.go index da982ece5..f5b4fe7d6 100644 --- a/pkg/framework/testing/utils.go +++ b/pkg/framework/testing/utils.go @@ -7,10 +7,12 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/tools/events" + "k8s.io/component-base/featuregate" clientset "k8s.io/client-go/kubernetes" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/pkg/features" frameworkfake "sigs.k8s.io/descheduler/pkg/framework/fake" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" @@ -44,7 +46,14 @@ func InitFrameworkHandle( sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) eventRecorder := &events.FakeRecorder{} - podEvictor := evictions.NewPodEvictor(client, eventRecorder, evictionOptions) + featureGates := featuregate.NewFeatureGate() + featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ + features.EvictionsInBackground: {Default: false, PreRelease: featuregate.Alpha}, + }) + podEvictor, err := evictions.NewPodEvictor(ctx, client, eventRecorder, podInformer, featureGates, evictionOptions) + if err != nil { + return nil, nil, fmt.Errorf("Unable to initialize pod evictor: %v", err) + } evictorFilter, err := defaultevictor.New( &defaultEvictorArgs, &frameworkfake.HandleImpl{ diff --git a/test/e2e/e2e_evictioninbackground_test.go b/test/e2e/e2e_evictioninbackground_test.go new file mode 100644 index 000000000..839d20806 --- /dev/null +++ b/test/e2e/e2e_evictioninbackground_test.go @@ -0,0 +1,542 @@ +package e2e + +import ( + "context" + "fmt" + "os" + "strings" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + componentbaseconfig "k8s.io/component-base/config" + "k8s.io/klog/v2" + utilptr "k8s.io/utils/ptr" + + kvcorev1 "kubevirt.io/api/core/v1" + generatedclient "kubevirt.io/client-go/generated/kubevirt/clientset/versioned" + + "sigs.k8s.io/descheduler/pkg/api" + apiv1alpha2 "sigs.k8s.io/descheduler/pkg/api/v1alpha2" + "sigs.k8s.io/descheduler/pkg/descheduler/client" + "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" + "sigs.k8s.io/descheduler/pkg/framework/plugins/podlifetime" +) + +const ( + vmiCount = 3 +) + +func virtualMachineInstance(idx int) *kvcorev1.VirtualMachineInstance { + return &kvcorev1.VirtualMachineInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("kubevirtvmi-%v", idx), + Annotations: map[string]string{ + "descheduler.alpha.kubernetes.io/request-evict-only": "", + }, + }, + Spec: kvcorev1.VirtualMachineInstanceSpec{ + EvictionStrategy: utilptr.To[kvcorev1.EvictionStrategy](kvcorev1.EvictionStrategyLiveMigrate), + Domain: kvcorev1.DomainSpec{ + Devices: kvcorev1.Devices{ + AutoattachPodInterface: utilptr.To[bool](false), + Disks: []kvcorev1.Disk{ + { + Name: "containerdisk", + DiskDevice: kvcorev1.DiskDevice{ + Disk: &kvcorev1.DiskTarget{ + Bus: kvcorev1.DiskBusVirtio, + }, + }, + }, + { + Name: "cloudinitdisk", + DiskDevice: kvcorev1.DiskDevice{ + Disk: &kvcorev1.DiskTarget{ + Bus: kvcorev1.DiskBusVirtio, + }, + }, + }, + }, + Rng: &kvcorev1.Rng{}, + }, + Resources: kvcorev1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("1024M"), + }, + }, + }, + TerminationGracePeriodSeconds: utilptr.To[int64](0), + Volumes: []kvcorev1.Volume{ + { + Name: "containerdisk", + VolumeSource: kvcorev1.VolumeSource{ + ContainerDisk: &kvcorev1.ContainerDiskSource{ + Image: "quay.io/kubevirt/fedora-with-test-tooling-container-disk:20240710_1265d1090", + }, + }, + }, + { + Name: "cloudinitdisk", + VolumeSource: kvcorev1.VolumeSource{ + CloudInitNoCloud: &kvcorev1.CloudInitNoCloudSource{ + UserData: `#cloud-config +password: fedora +chpasswd: { expire: False } +packages: + - nginx +runcmd: + - [ "systemctl", "enable", "--now", "nginx" ]`, + NetworkData: `version: 2 +ethernets: + eth0: + addresses: [ fd10:0:2::2/120 ] + dhcp4: true + gateway6: fd10:0:2::1`, + }, + }, + }, + }, + }, + } +} + +func waitForKubevirtReady(t *testing.T, ctx context.Context, kvClient generatedclient.Interface) { + obj, err := kvClient.KubevirtV1().KubeVirts("kubevirt").Get(ctx, "kubevirt", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unable to get kubevirt/kubevirt: %v", err) + } + available := false + for _, condition := range obj.Status.Conditions { + if condition.Type == kvcorev1.KubeVirtConditionAvailable { + if condition.Status == corev1.ConditionTrue { + available = true + } + } + } + if !available { + t.Fatalf("Kubevirt is not available") + } + klog.Infof("Kubevirt is available") +} + +func allVMIsHaveRunningPods(t *testing.T, ctx context.Context, kubeClient clientset.Interface, kvClient generatedclient.Interface) (bool, error) { + klog.Infof("Checking all vmi active pods are running") + uidMap := make(map[types.UID]*corev1.Pod) + podList, err := kubeClient.CoreV1().Pods("default").List(ctx, metav1.ListOptions{}) + if err != nil { + if strings.Contains(err.Error(), "client rate limiter") { + klog.Infof("Unable to list pods: %v", err) + return false, nil + } + klog.Infof("Unable to list pods: %v", err) + return false, err + } + + for _, item := range podList.Items { + pod := item + klog.Infof("item: %#v\n", item.UID) + uidMap[item.UID] = &pod + } + + vmiList, err := kvClient.KubevirtV1().VirtualMachineInstances("default").List(ctx, metav1.ListOptions{}) + if err != nil { + klog.Infof("Unable to list VMIs: %v", err) + return false, err + } + if len(vmiList.Items) != vmiCount { + klog.Infof("Expected %v VMIs, got %v instead", vmiCount, len(vmiList.Items)) + return false, nil + } + + for _, item := range vmiList.Items { + atLeastOneVmiIsRunning := false + for activePod := range item.Status.ActivePods { + if _, exists := uidMap[activePod]; !exists { + klog.Infof("Active pod %v not found", activePod) + return false, nil + } + klog.Infof("Checking whether active pod %v (uid=%v) is running", uidMap[activePod].Name, activePod) + // ignore completed/failed pods + if uidMap[activePod].Status.Phase == corev1.PodFailed || uidMap[activePod].Status.Phase == corev1.PodSucceeded { + klog.Infof("Ignoring active pod %v, phase=%v", uidMap[activePod].Name, uidMap[activePod].Status.Phase) + continue + } + if uidMap[activePod].Status.Phase != corev1.PodRunning { + klog.Infof("activePod %v is not running: %v\n", uidMap[activePod].Name, uidMap[activePod].Status.Phase) + return false, nil + } + atLeastOneVmiIsRunning = true + } + if !atLeastOneVmiIsRunning { + klog.Infof("vmi %v does not have any activePod running\n", item.Name) + return false, nil + } + } + + return true, nil +} + +func podLifeTimePolicy() *apiv1alpha2.DeschedulerPolicy { + return &apiv1alpha2.DeschedulerPolicy{ + Profiles: []apiv1alpha2.DeschedulerProfile{ + { + Name: "KubeVirtPodLifetimeProfile", + PluginConfigs: []apiv1alpha2.PluginConfig{ + { + Name: podlifetime.PluginName, + Args: runtime.RawExtension{ + Object: &podlifetime.PodLifeTimeArgs{ + MaxPodLifeTimeSeconds: utilptr.To[uint](1), // set it to immediate eviction + Namespaces: &api.Namespaces{ + Include: []string{"default"}, + }, + }, + }, + }, + { + Name: defaultevictor.PluginName, + Args: runtime.RawExtension{ + Object: &defaultevictor.DefaultEvictorArgs{ + EvictLocalStoragePods: true, + }, + }, + }, + }, + Plugins: apiv1alpha2.Plugins{ + Filter: apiv1alpha2.PluginSet{ + Enabled: []string{ + defaultevictor.PluginName, + }, + }, + Deschedule: apiv1alpha2.PluginSet{ + Enabled: []string{ + podlifetime.PluginName, + }, + }, + }, + }, + }, + } +} + +func kVirtRunningPodNames(t *testing.T, ctx context.Context, kubeClient clientset.Interface) []string { + names := []string{} + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + podList, err := kubeClient.CoreV1().Pods("default").List(ctx, metav1.ListOptions{}) + if err != nil { + if isClientRateLimiterError(err) { + t.Log(err) + return false, nil + } + klog.Infof("Unable to list pods: %v", err) + return false, err + } + + for _, item := range podList.Items { + if !strings.HasPrefix(item.Name, "virt-launcher-kubevirtvmi-") { + t.Fatalf("Only pod names with 'virt-launcher-kubevirtvmi-' prefix are expected, got %q instead", item.Name) + } + if item.Status.Phase == corev1.PodRunning { + names = append(names, item.Name) + } + } + + return true, nil + }); err != nil { + t.Fatalf("Unable to list running kvirt pod names: %v", err) + } + return names +} + +func observeLiveMigration(t *testing.T, ctx context.Context, kubeClient clientset.Interface, usedRunningPodNames map[string]struct{}) { + prevTotal := uint(0) + jumps := 0 + // keep running the descheduling cycle until the migration is triggered and completed few times or times out + for i := 0; i < 240; i++ { + // monitor how many pods get evicted + names := kVirtRunningPodNames(t, ctx, kubeClient) + klog.Infof("vmi pods: %#v\n", names) + // The number of pods need to be kept between vmiCount and vmiCount+1. + // At most two pods are expected to have virt-launcher-kubevirtvmi-X prefix name in common. + prefixes := make(map[string]uint) + for _, name := range names { + // "virt-launcher-kubevirtvmi-" + str := strings.Split(name, "-")[4] + prefixes[str]++ + usedRunningPodNames[name] = struct{}{} + } + + hasDouble := false + total := uint(0) + for idx, count := range prefixes { + total += count + if count > 2 { + t.Fatalf("A vmi kubevirtvmi-%v has more than 2 running active pods (%v), not expected", idx, count) + } + if count == 2 { + if !hasDouble { + hasDouble = true + continue + } + t.Fatalf("Another vmi with 2 running active pods, not expected") + } + } + // The total sum can not be higher than vmiCount+1 + if total > vmiCount+1 { + t.Fatalf("Total running pods (%v) are higher than expected vmiCount+1 (%v)", total, vmiCount+1) + } + + if prevTotal != 0 && prevTotal != total { + jumps++ + } + // Expect at least 3 finished live migrations (two should be enough as well, though ...) + if jumps >= 6 { + break + } + prevTotal = total + time.Sleep(time.Second) + } + + if jumps < 6 { + podList, err := kubeClient.CoreV1().Pods("default").List(ctx, metav1.ListOptions{}) + if err != nil { + klog.Infof("Unable to list pods: %v", err) + } else { + for _, item := range podList.Items { + klog.Infof("pod(%v): %#v", item.Name, item) + } + } + + t.Fatalf("Expected at least 3 finished live migrations, got less: %v", jumps/2.0) + } + klog.Infof("The live migration finished 3 times") + + // len(usedRunningPodNames) is expected to be vmiCount + jumps/2 + 1 (one more live migration could still be initiated) + klog.Infof("len(usedRunningPodNames): %v, upper limit: %v\n", len(usedRunningPodNames), vmiCount+jumps/2+1) + if len(usedRunningPodNames) > vmiCount+jumps/2+1 { + t.Fatalf("Expected vmiCount + jumps/2 + 1 = %v running pods, got %v instead", vmiCount+jumps/2+1, len(usedRunningPodNames)) + } + + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + names := kVirtRunningPodNames(t, ctx, kubeClient) + klog.Infof("vmi pods: %#v\n", names) + lNames := len(names) + if lNames != vmiCount { + klog.Infof("Waiting for the number of running vmi pods to be %v, got %v instead", vmiCount, lNames) + return false, nil + } + klog.Infof("The number of running vmi pods is %v as expected", vmiCount) + return true, nil + }); err != nil { + t.Fatalf("Error waiting for %v vmi active pods to be running: %v", vmiCount, err) + } +} + +func createAndWaitForDeschedulerRunning(t *testing.T, ctx context.Context, kubeClient clientset.Interface, deschedulerDeploymentObj *appsv1.Deployment) string { + klog.Infof("Creating descheduler deployment %v", deschedulerDeploymentObj.Name) + _, err := kubeClient.AppsV1().Deployments(deschedulerDeploymentObj.Namespace).Create(ctx, deschedulerDeploymentObj, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating %q deployment: %v", deschedulerDeploymentObj.Name, err) + } + + klog.Infof("Waiting for the descheduler pod running") + deschedulerPods := waitForPodsRunning(ctx, t, kubeClient, deschedulerDeploymentObj.Labels, 1, deschedulerDeploymentObj.Namespace) + if len(deschedulerPods) == 0 { + t.Fatalf("Error waiting for %q deployment: no running pod found", deschedulerDeploymentObj.Name) + } + return deschedulerPods[0].Name +} + +func updateDeschedulerPolicy(t *testing.T, ctx context.Context, kubeClient clientset.Interface, policy *apiv1alpha2.DeschedulerPolicy) { + deschedulerPolicyConfigMapObj, err := deschedulerPolicyConfigMap(policy) + if err != nil { + t.Fatalf("Error creating %q CM with unlimited evictions: %v", deschedulerPolicyConfigMapObj.Name, err) + } + _, err = kubeClient.CoreV1().ConfigMaps(deschedulerPolicyConfigMapObj.Namespace).Update(ctx, deschedulerPolicyConfigMapObj, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error updating %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } +} + +func createKubevirtClient() (generatedclient.Interface, error) { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig + overrides := &clientcmd.ConfigOverrides{} + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) + + config, err := clientConfig.ClientConfig() + if err != nil { + return nil, err + } + config.GroupVersion = &kvcorev1.StorageGroupVersion + config.APIPath = "/apis" + config.ContentType = runtime.ContentTypeJSON + + return generatedclient.NewForConfig(config) +} + +func TestLiveMigrationInBackground(t *testing.T) { + initPluginRegistry() + + ctx := context.Background() + + kubeClient, err := client.CreateClient(componentbaseconfig.ClientConnectionConfiguration{Kubeconfig: os.Getenv("KUBECONFIG")}, "") + if err != nil { + t.Fatalf("Error during kubernetes client creation with %v", err) + } + + kvClient, err := createKubevirtClient() + if err != nil { + t.Fatalf("Error during kvClient creation with %v", err) + } + + waitForKubevirtReady(t, ctx, kvClient) + + // Delete all VMIs + defer func() { + for i := 1; i <= vmiCount; i++ { + vmi := virtualMachineInstance(i) + err := kvClient.KubevirtV1().VirtualMachineInstances("default").Delete(context.Background(), vmi.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + klog.Infof("Unable to delete vmi %v: %v", vmi.Name, err) + } + } + wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + podList, err := kubeClient.CoreV1().Pods("default").List(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + lPods := len(podList.Items) + if lPods > 0 { + klog.Infof("Waiting until all pods under default namespace are gone, %v remaining", lPods) + return false, nil + } + return true, nil + }) + }() + + // Create N vmis and wait for the corresponding vm pods to be ready and running + for i := 1; i <= vmiCount; i++ { + vmi := virtualMachineInstance(i) + _, err = kvClient.KubevirtV1().VirtualMachineInstances("default").Create(context.Background(), vmi, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Unable to create KubeVirt vmi: %v\n", err) + } + } + + // Wait until all VMIs have running pods + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 300*time.Second, true, func(ctx context.Context) (bool, error) { + return allVMIsHaveRunningPods(t, ctx, kubeClient, kvClient) + }); err != nil { + t.Fatalf("Error waiting for all vmi active pods to be running: %v", err) + } + + usedRunningPodNames := make(map[string]struct{}) + // vmiCount number of names is expected + names := kVirtRunningPodNames(t, ctx, kubeClient) + klog.Infof("vmi pods: %#v\n", names) + if len(names) != vmiCount { + t.Fatalf("Expected %v vmi pods, got %v instead", vmiCount, len(names)) + } + for _, name := range names { + usedRunningPodNames[name] = struct{}{} + } + + policy := podLifeTimePolicy() + // Allow only a single eviction simultaneously + policy.MaxNoOfPodsToEvictPerNamespace = utilptr.To[uint](1) + // Deploy the descheduler with the configured policy + deschedulerPolicyConfigMapObj, err := deschedulerPolicyConfigMap(policy) + if err != nil { + t.Fatalf("Error creating %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + klog.Infof("Creating %q policy CM with RemovePodsHavingTooManyRestarts configured...", deschedulerPolicyConfigMapObj.Name) + _, err = kubeClient.CoreV1().ConfigMaps(deschedulerPolicyConfigMapObj.Namespace).Create(ctx, deschedulerPolicyConfigMapObj, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + defer func() { + klog.Infof("Deleting %q CM...", deschedulerPolicyConfigMapObj.Name) + err = kubeClient.CoreV1().ConfigMaps(deschedulerPolicyConfigMapObj.Namespace).Delete(ctx, deschedulerPolicyConfigMapObj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Unable to delete %q CM: %v", deschedulerPolicyConfigMapObj.Name, err) + } + }() + + deschedulerDeploymentObj := deschedulerDeployment("kube-system") + // Set the descheduling interval to 10s + deschedulerDeploymentObj.Spec.Template.Spec.Containers[0].Args = []string{"--policy-config-file", "/policy-dir/policy.yaml", "--descheduling-interval", "10s", "--v", "4", "--feature-gates", "EvictionsInBackground=true"} + + deschedulerPodName := "" + defer func() { + if deschedulerPodName != "" { + printPodLogs(ctx, t, kubeClient, deschedulerPodName) + } + + klog.Infof("Deleting %q deployment...", deschedulerDeploymentObj.Name) + err = kubeClient.AppsV1().Deployments(deschedulerDeploymentObj.Namespace).Delete(ctx, deschedulerDeploymentObj.Name, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return + } + t.Fatalf("Unable to delete %q deployment: %v", deschedulerDeploymentObj.Name, err) + } + waitForPodsToDisappear(ctx, t, kubeClient, deschedulerDeploymentObj.Labels, deschedulerDeploymentObj.Namespace) + }() + + deschedulerPodName = createAndWaitForDeschedulerRunning(t, ctx, kubeClient, deschedulerDeploymentObj) + + observeLiveMigration(t, ctx, kubeClient, usedRunningPodNames) + + printPodLogs(ctx, t, kubeClient, deschedulerPodName) + + klog.Infof("Deleting the current descheduler pod") + err = kubeClient.AppsV1().Deployments(deschedulerDeploymentObj.Namespace).Delete(ctx, deschedulerDeploymentObj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Error deleting %q deployment: %v", deschedulerDeploymentObj.Name, err) + } + + remainingPods := make(map[string]struct{}) + for _, name := range kVirtRunningPodNames(t, ctx, kubeClient) { + remainingPods[name] = struct{}{} + } + + klog.Infof("Configuring the descheduler policy %v for PodLifetime with no limits", deschedulerPolicyConfigMapObj.Name) + policy.MaxNoOfPodsToEvictPerNamespace = nil + updateDeschedulerPolicy(t, ctx, kubeClient, policy) + + deschedulerDeploymentObj = deschedulerDeployment("kube-system") + deschedulerDeploymentObj.Spec.Template.Spec.Containers[0].Args = []string{"--policy-config-file", "/policy-dir/policy.yaml", "--descheduling-interval", "100m", "--v", "4", "--feature-gates", "EvictionsInBackground=true"} + deschedulerPodName = createAndWaitForDeschedulerRunning(t, ctx, kubeClient, deschedulerDeploymentObj) + + klog.Infof("Waiting until all pods are evicted (no limit set)") + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 120*time.Second, true, func(ctx context.Context) (bool, error) { + names := kVirtRunningPodNames(t, ctx, kubeClient) + for _, name := range names { + if _, exists := remainingPods[name]; exists { + klog.Infof("Waiting for %v to disappear", name) + return false, nil + } + } + lNames := len(names) + if lNames != vmiCount { + klog.Infof("Waiting for the number of newly running vmi pods to be %v, got %v instead", vmiCount, lNames) + return false, nil + } + klog.Infof("The number of newly running vmi pods is %v as expected", vmiCount) + return true, nil + }); err != nil { + t.Fatalf("Error waiting for %v new vmi active pods to be running: %v", vmiCount, err) + } +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index ad3639fd6..736d19f89 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -40,6 +40,7 @@ import ( clientset "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" componentbaseconfig "k8s.io/component-base/config" + "k8s.io/component-base/featuregate" "k8s.io/klog/v2" utilptr "k8s.io/utils/ptr" "sigs.k8s.io/yaml" @@ -54,6 +55,7 @@ import ( eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + "sigs.k8s.io/descheduler/pkg/features" "sigs.k8s.io/descheduler/pkg/framework/pluginregistry" "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" "sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization" @@ -68,6 +70,14 @@ func isClientRateLimiterError(err error) bool { return strings.Contains(err.Error(), "client rate limiter") } +func initFeatureGates() featuregate.FeatureGate { + featureGates := featuregate.NewFeatureGate() + featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{ + features.EvictionsInBackground: {Default: false, PreRelease: featuregate.Alpha}, + }) + return featureGates +} + func deschedulerPolicyConfigMap(policy *deschedulerapiv1alpha2.DeschedulerPolicy) (*v1.ConfigMap, error) { cm := &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -1344,6 +1354,7 @@ func TestDeschedulingInterval(t *testing.T) { t.Fatalf("Unable to initialize server: %v", err) } s.Client = clientSet + s.DefaultFeatureGates = initFeatureGates() deschedulerPolicy := &deschedulerapi.DeschedulerPolicy{} diff --git a/test/e2e/e2e_toomanyrestarts_test.go b/test/e2e/e2e_toomanyrestarts_test.go index 6cdb2cfdd..ba9f9d4d8 100644 --- a/test/e2e/e2e_toomanyrestarts_test.go +++ b/test/e2e/e2e_toomanyrestarts_test.go @@ -148,6 +148,7 @@ func TestTooManyRestarts(t *testing.T) { } rs.Client = clientSet rs.EventClient = clientSet + rs.DefaultFeatureGates = initFeatureGates() preRunNames := sets.NewString(getCurrentPodNames(ctx, clientSet, testNamespace.Name, t)...) // Deploy the descheduler with the configured policy diff --git a/test/e2e/test_clientconnection_test.go b/test/e2e/test_clientconnection_test.go index bb5c92b9b..c7afc57fe 100644 --- a/test/e2e/test_clientconnection_test.go +++ b/test/e2e/test_clientconnection_test.go @@ -31,6 +31,7 @@ func TestClientConnectionConfiguration(t *testing.T) { t.Fatalf("Unable to initialize server: %v", err) } s.Client = clientSet + s.DefaultFeatureGates = initFeatureGates() evictionPolicyGroupVersion, err := eutils.SupportEviction(s.Client) if err != nil || len(evictionPolicyGroupVersion) == 0 { t.Errorf("Error when checking support for eviction: %v", err) diff --git a/test/run-e2e-tests.sh b/test/run-e2e-tests.sh index 6856df2f3..65ebe1527 100755 --- a/test/run-e2e-tests.sh +++ b/test/run-e2e-tests.sh @@ -21,6 +21,12 @@ set -o nounset # Set to empty if unbound/empty SKIP_INSTALL=${SKIP_INSTALL:-} KIND_E2E=${KIND_E2E:-} +CONTAINER_ENGINE=${CONTAINER_ENGINE:-docker} +KIND_SUDO=${KIND_SUDO:-} +SKIP_KUBECTL_INSTALL=${SKIP_KUBECTL_INSTALL:-} +SKIP_KIND_INSTALL=${SKIP_KIND_INSTALL:-} +SKIP_KUBEVIRT_INSTALL=${SKIP_KUBEVIRT_INSTALL:-} +KUBEVIRT_VERSION=${KUBEVIRT_VERSION:-v1.3.0-rc.1} # Build a descheduler image IMAGE_TAG=v$(date +%Y%m%d)-$(git describe --tags) @@ -32,20 +38,35 @@ echo "DESCHEDULER_IMAGE: ${DESCHEDULER_IMAGE}" # This just runs e2e tests. if [ -n "$KIND_E2E" ]; then - # If we did not set SKIP_INSTALL - if [ -z "$SKIP_INSTALL" ]; then - K8S_VERSION=${KUBERNETES_VERSION:-v1.31.0} + K8S_VERSION=${KUBERNETES_VERSION:-v1.31.0} + if [ -z "${SKIP_KUBECTL_INSTALL}" ]; then curl -Lo kubectl https://dl.k8s.io/release/${K8S_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl && mv kubectl /usr/local/bin/ + fi + if [ -z "${SKIP_KIND_INSTALL}" ]; then wget https://github.com/kubernetes-sigs/kind/releases/download/v0.24.0/kind-linux-amd64 chmod +x kind-linux-amd64 mv kind-linux-amd64 kind export PATH=$PATH:$PWD - kind create cluster --image kindest/node:${K8S_VERSION} --config=./hack/kind_config.yaml fi - ${CONTAINER_ENGINE:-docker} pull registry.k8s.io/pause - kind load docker-image registry.k8s.io/pause - kind load docker-image ${DESCHEDULER_IMAGE} - kind get kubeconfig > /tmp/admin.conf + + # If we did not set SKIP_INSTALL + if [ -z "$SKIP_INSTALL" ]; then + ${KIND_SUDO} kind create cluster --image kindest/node:${K8S_VERSION} --config=./hack/kind_config.yaml + fi + ${CONTAINER_ENGINE} pull registry.k8s.io/pause + if [ "${CONTAINER_ENGINE}" == "podman" ]; then + podman save registry.k8s.io/pause -o /tmp/pause.tar + ${KIND_SUDO} kind load image-archive /tmp/pause.tar + rm /tmp/pause.tar + podman save ${DESCHEDULER_IMAGE} -o /tmp/descheduler.tar + ${KIND_SUDO} kind load image-archive /tmp/descheduler.tar + rm /tmp/descheduler.tar + else + ${KIND_SUDO} kind load docker-image registry.k8s.io/pause + ${KIND_SUDO} kind load docker-image ${DESCHEDULER_IMAGE} + fi + ${KIND_SUDO} kind get kubeconfig > /tmp/admin.conf + export KUBECONFIG="/tmp/admin.conf" mkdir -p ~/gopath/src/sigs.k8s.io/ fi @@ -53,5 +74,30 @@ fi # Deploy rbac, sa and binding for a descheduler running through a deployment kubectl apply -f kubernetes/base/rbac.yaml +collect_logs() { + echo "Collecting pods and logs" + kubectl get pods -n default + kubectl get pods -n kubevirt + + for pod in $(kubectl get pods -n default -o name); do + echo "Logs for ${pod}" + kubectl logs -n default ${pod} + done + + for pod in $(kubectl get pods -n kubevirt -o name); do + echo "Logs for ${pod}" + kubectl logs -n kubevirt ${pod} + done +} + +trap "collect_logs" ERR + +if [ -z "${SKIP_KUBEVIRT_INSTALL}" ]; then + kubectl create -f https://github.com/kubevirt/kubevirt/releases/download/${KUBEVIRT_VERSION}/kubevirt-operator.yaml + kubectl create -f https://github.com/kubevirt/kubevirt/releases/download/${KUBEVIRT_VERSION}/kubevirt-cr.yaml + kubectl wait --timeout=180s --for=condition=Available -n kubevirt kv/kubevirt + kubectl -n kubevirt patch kubevirt kubevirt --type=merge --patch '{"spec":{"configuration":{"developerConfiguration":{"useEmulation":true}}}}' +fi + PRJ_PREFIX="sigs.k8s.io/descheduler" go test ${PRJ_PREFIX}/test/e2e/ -v -timeout 0