diff --git a/cmd/descheduler/app/server.go b/cmd/descheduler/app/server.go index 65cbf5f9b..8dd9bfa29 100644 --- a/cmd/descheduler/app/server.go +++ b/cmd/descheduler/app/server.go @@ -73,7 +73,7 @@ func NewDeschedulerCommand(out io.Writer) *cobra.Command { } ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer done() + pathRecorderMux := mux.NewPathRecorderMux("descheduler") if !s.DisableMetrics { pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset()) @@ -81,15 +81,20 @@ func NewDeschedulerCommand(out io.Writer) *cobra.Command { healthz.InstallHandler(pathRecorderMux, healthz.NamedCheck("Descheduler", healthz.PingHealthz.Check)) - if _, err := SecureServing.Serve(pathRecorderMux, 0, ctx.Done()); err != nil { + stoppedCh, err := SecureServing.Serve(pathRecorderMux, 0, ctx.Done()) + if err != nil { klog.Fatalf("failed to start secure server: %v", err) return } - err := Run(ctx, s) + err = Run(ctx, s) if err != nil { klog.ErrorS(err, "descheduler server") } + + done() + // wait for metrics server to close + <-stoppedCh }, } cmd.SetOut(out) diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 05239d343..15ad38a06 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -69,13 +69,7 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error { return err } - // tie in root ctx with our wait stopChannel - stopChannel := make(chan struct{}) - go func() { - <-ctx.Done() - close(stopChannel) - }() - return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel) + return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion) } type strategyFunction func(ctx context.Context, client clientset.Interface, strategy api.DeschedulerStrategy, nodes []*v1.Node, podEvictor *evictions.PodEvictor, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc) @@ -156,13 +150,16 @@ func cachedClient( return fakeClient, nil } -func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, stopChannel chan struct{}) error { +func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error { sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0) nodeInformer := sharedInformerFactory.Core().V1().Nodes() podInformer := sharedInformerFactory.Core().V1().Pods() namespaceInformer := sharedInformerFactory.Core().V1().Namespaces() priorityClassInformer := sharedInformerFactory.Scheduling().V1().PriorityClasses() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // create the informers namespaceInformer.Informer() priorityClassInformer.Informer() @@ -172,8 +169,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer return fmt.Errorf("build get pods assigned to node function error: %v", err) } - sharedInformerFactory.Start(stopChannel) - sharedInformerFactory.WaitForCacheSync(stopChannel) + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) strategyFuncs := map[api.StrategyName]strategyFunction{ "RemoveDuplicates": strategies.RemoveDuplicatePods, @@ -223,13 +220,13 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeInformer, nodeSelector) if err != nil { klog.V(1).InfoS("Unable to get ready nodes", "err", err) - close(stopChannel) + cancel() return } if len(nodes) <= 1 { klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..") - close(stopChannel) + cancel() return } @@ -292,9 +289,9 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer // If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration if rs.DeschedulingInterval.Seconds() == 0 { - close(stopChannel) + cancel() } - }, rs.DeschedulingInterval, stopChannel) + }, rs.DeschedulingInterval, ctx.Done()) return nil } diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 2a9f0256d..07c9692ac 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -35,9 +35,6 @@ func TestTaintsUpdated(t *testing.T) { }, } - stopChannel := make(chan struct{}) - defer close(stopChannel) - rs, err := options.NewDeschedulerServer() if err != nil { t.Fatalf("Unable to initialize server: %v", err) @@ -47,7 +44,7 @@ func TestTaintsUpdated(t *testing.T) { errChan := make(chan error, 1) defer close(errChan) go func() { - err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1", stopChannel) + err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1") errChan <- err }() select { @@ -101,3 +98,69 @@ func TestTaintsUpdated(t *testing.T) { t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies") } } + +func TestRootCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + client := fakeclientset.NewSimpleClientset(n1, n2) + dp := &api.DeschedulerPolicy{ + Strategies: api.StrategyList{}, // no strategies needed for this test + } + + rs, err := options.NewDeschedulerServer() + if err != nil { + t.Fatalf("Unable to initialize server: %v", err) + } + rs.Client = client + rs.DeschedulingInterval = 100 * time.Millisecond + errChan := make(chan error, 1) + defer close(errChan) + + go func() { + err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1") + errChan <- err + }() + cancel() + select { + case err := <-errChan: + if err != nil { + t.Fatalf("Unable to run descheduler strategies: %v", err) + } + case <-time.After(1 * time.Second): + t.Fatal("Root ctx should have canceled immediately") + } +} + +func TestRootCancelWithNoInterval(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + client := fakeclientset.NewSimpleClientset(n1, n2) + dp := &api.DeschedulerPolicy{ + Strategies: api.StrategyList{}, // no strategies needed for this test + } + + rs, err := options.NewDeschedulerServer() + if err != nil { + t.Fatalf("Unable to initialize server: %v", err) + } + rs.Client = client + rs.DeschedulingInterval = 0 + errChan := make(chan error, 1) + defer close(errChan) + + go func() { + err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1") + errChan <- err + }() + cancel() + select { + case err := <-errChan: + if err != nil { + t.Fatalf("Unable to run descheduler strategies: %v", err) + } + case <-time.After(1 * time.Second): + t.Fatal("Root ctx should have canceled immediately") + } +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index a832d463b..59f083bd3 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -984,9 +984,7 @@ func TestDeschedulingInterval(t *testing.T) { if err != nil || len(evictionPolicyGroupVersion) == 0 { t.Errorf("Error when checking support for eviction: %v", err) } - - stopChannel := make(chan struct{}) - if err := descheduler.RunDeschedulerStrategies(ctx, s, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel); err != nil { + if err := descheduler.RunDeschedulerStrategies(ctx, s, deschedulerPolicy, evictionPolicyGroupVersion); err != nil { t.Errorf("Error running descheduler strategies: %+v", err) } c <- true