1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

Merge pull request #742 from damemi/ctx-cancel-backport

[release-1.23] Backport context cancel panic + Go version update
This commit is contained in:
Kubernetes Prow Robot
2022-02-28 10:55:46 -08:00
committed by GitHub
6 changed files with 90 additions and 27 deletions

View File

@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
FROM golang:1.17.3 FROM golang:1.17.7
WORKDIR /go/src/sigs.k8s.io/descheduler WORKDIR /go/src/sigs.k8s.io/descheduler
COPY . . COPY . .

View File

@@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
name: descheduler name: descheduler
version: 0.23.1 version: 0.23.2
appVersion: 0.23.0 appVersion: 0.23.1
description: Descheduler for Kubernetes is used to rebalance clusters by evicting pods that can potentially be scheduled on better nodes. In the current implementation, descheduler does not schedule replacement of evicted pods but relies on the default scheduler for that. description: Descheduler for Kubernetes is used to rebalance clusters by evicting pods that can potentially be scheduled on better nodes. In the current implementation, descheduler does not schedule replacement of evicted pods but relies on the default scheduler for that.
keywords: keywords:
- kubernetes - kubernetes

View File

@@ -73,7 +73,7 @@ func NewDeschedulerCommand(out io.Writer) *cobra.Command {
} }
ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer done()
pathRecorderMux := mux.NewPathRecorderMux("descheduler") pathRecorderMux := mux.NewPathRecorderMux("descheduler")
if !s.DisableMetrics { if !s.DisableMetrics {
pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset()) pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
@@ -81,15 +81,20 @@ func NewDeschedulerCommand(out io.Writer) *cobra.Command {
healthz.InstallHandler(pathRecorderMux, healthz.NamedCheck("Descheduler", healthz.PingHealthz.Check)) healthz.InstallHandler(pathRecorderMux, healthz.NamedCheck("Descheduler", healthz.PingHealthz.Check))
if _, err := SecureServing.Serve(pathRecorderMux, 0, ctx.Done()); err != nil { stoppedCh, err := SecureServing.Serve(pathRecorderMux, 0, ctx.Done())
if err != nil {
klog.Fatalf("failed to start secure server: %v", err) klog.Fatalf("failed to start secure server: %v", err)
return return
} }
err := Run(ctx, s) err = Run(ctx, s)
if err != nil { if err != nil {
klog.ErrorS(err, "descheduler server") klog.ErrorS(err, "descheduler server")
} }
done()
// wait for metrics server to close
<-stoppedCh
}, },
} }
cmd.SetOut(out) cmd.SetOut(out)

View File

@@ -69,13 +69,7 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
return err return err
} }
// tie in root ctx with our wait stopChannel return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion)
stopChannel := make(chan struct{})
go func() {
<-ctx.Done()
close(stopChannel)
}()
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel)
} }
type strategyFunction func(ctx context.Context, client clientset.Interface, strategy api.DeschedulerStrategy, nodes []*v1.Node, podEvictor *evictions.PodEvictor, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc) type strategyFunction func(ctx context.Context, client clientset.Interface, strategy api.DeschedulerStrategy, nodes []*v1.Node, podEvictor *evictions.PodEvictor, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc)
@@ -156,13 +150,16 @@ func cachedClient(
return fakeClient, nil return fakeClient, nil
} }
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, stopChannel chan struct{}) error { func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0) sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes() nodeInformer := sharedInformerFactory.Core().V1().Nodes()
podInformer := sharedInformerFactory.Core().V1().Pods() podInformer := sharedInformerFactory.Core().V1().Pods()
namespaceInformer := sharedInformerFactory.Core().V1().Namespaces() namespaceInformer := sharedInformerFactory.Core().V1().Namespaces()
priorityClassInformer := sharedInformerFactory.Scheduling().V1().PriorityClasses() priorityClassInformer := sharedInformerFactory.Scheduling().V1().PriorityClasses()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// create the informers // create the informers
namespaceInformer.Informer() namespaceInformer.Informer()
priorityClassInformer.Informer() priorityClassInformer.Informer()
@@ -172,8 +169,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
return fmt.Errorf("build get pods assigned to node function error: %v", err) return fmt.Errorf("build get pods assigned to node function error: %v", err)
} }
sharedInformerFactory.Start(stopChannel) sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(stopChannel) sharedInformerFactory.WaitForCacheSync(ctx.Done())
strategyFuncs := map[api.StrategyName]strategyFunction{ strategyFuncs := map[api.StrategyName]strategyFunction{
"RemoveDuplicates": strategies.RemoveDuplicatePods, "RemoveDuplicates": strategies.RemoveDuplicatePods,
@@ -223,13 +220,13 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeInformer, nodeSelector) nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeInformer, nodeSelector)
if err != nil { if err != nil {
klog.V(1).InfoS("Unable to get ready nodes", "err", err) klog.V(1).InfoS("Unable to get ready nodes", "err", err)
close(stopChannel) cancel()
return return
} }
if len(nodes) <= 1 { if len(nodes) <= 1 {
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..") klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
close(stopChannel) cancel()
return return
} }
@@ -292,9 +289,9 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
// If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration // If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration
if rs.DeschedulingInterval.Seconds() == 0 { if rs.DeschedulingInterval.Seconds() == 0 {
close(stopChannel) cancel()
} }
}, rs.DeschedulingInterval, stopChannel) }, rs.DeschedulingInterval, ctx.Done())
return nil return nil
} }

View File

@@ -35,9 +35,6 @@ func TestTaintsUpdated(t *testing.T) {
}, },
} }
stopChannel := make(chan struct{})
defer close(stopChannel)
rs, err := options.NewDeschedulerServer() rs, err := options.NewDeschedulerServer()
if err != nil { if err != nil {
t.Fatalf("Unable to initialize server: %v", err) t.Fatalf("Unable to initialize server: %v", err)
@@ -47,7 +44,7 @@ func TestTaintsUpdated(t *testing.T) {
errChan := make(chan error, 1) errChan := make(chan error, 1)
defer close(errChan) defer close(errChan)
go func() { go func() {
err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1", stopChannel) err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1")
errChan <- err errChan <- err
}() }()
select { select {
@@ -101,3 +98,69 @@ func TestTaintsUpdated(t *testing.T) {
t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies") t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies")
} }
} }
func TestRootCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
client := fakeclientset.NewSimpleClientset(n1, n2)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{}, // no strategies needed for this test
}
rs, err := options.NewDeschedulerServer()
if err != nil {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.DeschedulingInterval = 100 * time.Millisecond
errChan := make(chan error, 1)
defer close(errChan)
go func() {
err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1")
errChan <- err
}()
cancel()
select {
case err := <-errChan:
if err != nil {
t.Fatalf("Unable to run descheduler strategies: %v", err)
}
case <-time.After(1 * time.Second):
t.Fatal("Root ctx should have canceled immediately")
}
}
func TestRootCancelWithNoInterval(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
client := fakeclientset.NewSimpleClientset(n1, n2)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{}, // no strategies needed for this test
}
rs, err := options.NewDeschedulerServer()
if err != nil {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.DeschedulingInterval = 0
errChan := make(chan error, 1)
defer close(errChan)
go func() {
err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1")
errChan <- err
}()
cancel()
select {
case err := <-errChan:
if err != nil {
t.Fatalf("Unable to run descheduler strategies: %v", err)
}
case <-time.After(1 * time.Second):
t.Fatal("Root ctx should have canceled immediately")
}
}

View File

@@ -984,9 +984,7 @@ func TestDeschedulingInterval(t *testing.T) {
if err != nil || len(evictionPolicyGroupVersion) == 0 { if err != nil || len(evictionPolicyGroupVersion) == 0 {
t.Errorf("Error when checking support for eviction: %v", err) t.Errorf("Error when checking support for eviction: %v", err)
} }
if err := descheduler.RunDeschedulerStrategies(ctx, s, deschedulerPolicy, evictionPolicyGroupVersion); err != nil {
stopChannel := make(chan struct{})
if err := descheduler.RunDeschedulerStrategies(ctx, s, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel); err != nil {
t.Errorf("Error running descheduler strategies: %+v", err) t.Errorf("Error running descheduler strategies: %+v", err)
} }
c <- true c <- true