1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

use new events implementation and take recorder out of EvictPod

This commit is contained in:
Lucas Severo Alves
2022-07-21 16:59:42 +02:00
committed by Lucas Severo Alves
parent 0d3ff8a84f
commit 0aa233415e
20 changed files with 139 additions and 15 deletions

View File

@@ -26,7 +26,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
)
func CreateClient(kubeconfig string) (clientset.Interface, error) {
func CreateClient(kubeconfig string, userAgt string) (clientset.Interface, error) {
var cfg *rest.Config
if len(kubeconfig) != 0 {
master, err := GetMasterFromKubeconfig(kubeconfig)
@@ -47,7 +47,11 @@ func CreateClient(kubeconfig string) (clientset.Interface, error) {
}
}
return clientset.NewForConfig(cfg)
if len(userAgt) != 0 {
return clientset.NewForConfig(rest.AddUserAgent(cfg, userAgt))
} else {
return clientset.NewForConfig(cfg)
}
}
func GetMasterFromKubeconfig(filename string) (string, error) {

View File

@@ -53,11 +53,12 @@ import (
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
metrics.Register()
rsclient, err := client.CreateClient(rs.KubeconfigFile)
rsclient, eventClient, err := createClients(rs.KubeconfigFile)
if err != nil {
return err
}
rs.Client = rsclient
rs.EventClient = eventClient
deschedulerPolicy, err := LoadPolicyConfig(rs.PolicyConfigFile)
if err != nil {
@@ -287,6 +288,16 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
ignorePvcPods = *deschedulerPolicy.IgnorePVCPods
}
var eventClient clientset.Interface
if rs.DryRun {
eventClient = fakeclientset.NewSimpleClientset()
} else {
eventClient = rs.Client
}
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient)
defer eventBroadcaster.Shutdown()
wait.NonSlidingUntil(func() {
nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeInformer, nodeSelector)
if err != nil {
@@ -340,6 +351,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace,
nodes,
!rs.DisableMetrics,
eventRecorder,
)
for name, strategy := range deschedulerPolicy.Strategies {
@@ -413,3 +425,17 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
return nil
}
func createClients(kubeconfig string) (clientset.Interface, clientset.Interface, error) {
kClient, err := client.CreateClient(kubeconfig, "descheduler")
if err != nil {
return nil, nil, err
}
eventClient, err := client.CreateClient(kubeconfig, "")
if err != nil {
return nil, nil, err
}
return kClient, eventClient, nil
}

View File

@@ -27,6 +27,7 @@ func TestTaintsUpdated(t *testing.T) {
}
client := fakeclientset.NewSimpleClientset(n1, n2, p1)
eventClient := fakeclientset.NewSimpleClientset(n1, n2, p1)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{
"RemovePodsViolatingNodeTaints": api.DeschedulerStrategy{
@@ -40,6 +41,7 @@ func TestTaintsUpdated(t *testing.T) {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.EventClient = eventClient
rs.DeschedulingInterval = 100 * time.Millisecond
errChan := make(chan error, 1)
defer close(errChan)
@@ -104,6 +106,7 @@ func TestRootCancel(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
client := fakeclientset.NewSimpleClientset(n1, n2)
eventClient := fakeclientset.NewSimpleClientset(n1, n2)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{}, // no strategies needed for this test
}
@@ -113,6 +116,7 @@ func TestRootCancel(t *testing.T) {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.EventClient = eventClient
rs.DeschedulingInterval = 100 * time.Millisecond
errChan := make(chan error, 1)
defer close(errChan)
@@ -137,6 +141,7 @@ func TestRootCancelWithNoInterval(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
client := fakeclientset.NewSimpleClientset(n1, n2)
eventClient := fakeclientset.NewSimpleClientset(n1, n2)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{}, // no strategies needed for this test
}
@@ -146,6 +151,7 @@ func TestRootCancelWithNoInterval(t *testing.T) {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.EventClient = eventClient
rs.DeschedulingInterval = 0
errChan := make(chan error, 1)
defer close(errChan)

View File

@@ -27,9 +27,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"sigs.k8s.io/descheduler/metrics"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
@@ -57,6 +55,7 @@ type PodEvictor struct {
nodepodCount nodePodEvictedCount
namespacePodCount namespacePodEvictCount
metricsEnabled bool
eventRecorder events.EventRecorder
}
func NewPodEvictor(
@@ -67,6 +66,7 @@ func NewPodEvictor(
maxPodsToEvictPerNamespace *uint,
nodes []*v1.Node,
metricsEnabled bool,
eventRecorder events.EventRecorder,
) *PodEvictor {
var nodePodCount = make(nodePodEvictedCount)
var namespacePodCount = make(namespacePodEvictCount)
@@ -85,6 +85,7 @@ func NewPodEvictor(
nodepodCount: nodePodCount,
namespacePodCount: namespacePodCount,
metricsEnabled: metricsEnabled,
eventRecorder: eventRecorder,
}
}
@@ -166,11 +167,14 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio
klog.V(1).InfoS("Evicted pod in dry run mode", "pod", klog.KObj(pod), "reason", opts.Reason, "strategy", strategy, "node", pod.Spec.NodeName)
} else {
klog.V(1).InfoS("Evicted pod", "pod", klog.KObj(pod), "reason", opts.Reason, "strategy", strategy, "node", pod.Spec.NodeName)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(3)
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: pe.client.CoreV1().Events(pod.Namespace)})
r := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "sigs.k8s.io.descheduler"})
r.Event(pod, v1.EventTypeNormal, "Descheduled", fmt.Sprintf("pod evicted by sigs.k8s.io/descheduler%s", opts.Reason))
reason := opts.Reason
if len(reason) == 0 {
reason = strategy
if len(reason) == 0 {
reason = "NotSet"
}
}
pe.eventRecorder.Eventf(pod, nil, v1.EventTypeNormal, reason, "Descheduled", "pod evicted by sigs.k8s.io/descheduler")
}
return true
}

View File

@@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -313,6 +314,8 @@ func TestFindDuplicatePods(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
"v1",
@@ -321,6 +324,7 @@ func TestFindDuplicatePods(t *testing.T) {
nil,
testCase.nodes,
false,
eventRecorder,
)
nodeFit := false
@@ -751,6 +755,8 @@ func TestRemoveDuplicatesUniformly(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -759,6 +765,7 @@ func TestRemoveDuplicatesUniformly(t *testing.T) {
nil,
testCase.nodes,
false,
eventRecorder,
)
evictorFilter := evictions.NewEvictorFilter(

View File

@@ -10,6 +10,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -268,6 +269,8 @@ func TestRemoveFailedPods(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -276,6 +279,7 @@ func TestRemoveFailedPods(t *testing.T) {
nil,
tc.nodes,
false,
eventRecorder,
)
evictorFilter := evictions.NewEvictorFilter(

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -215,6 +216,8 @@ func TestRemovePodsViolatingNodeAffinity(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -223,6 +226,7 @@ func TestRemovePodsViolatingNodeAffinity(t *testing.T) {
tc.maxNoOfPodsToEvictPerNamespace,
tc.nodes,
false,
eventRecorder,
)
nodeFit := false

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -500,6 +501,8 @@ func TestHighNodeUtilization(t *testing.T) {
// return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
//})
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
"v1",
@@ -508,6 +511,7 @@ func TestHighNodeUtilization(t *testing.T) {
nil,
testCase.nodes,
false,
eventRecorder,
)
strategy := api.DeschedulerStrategy{
@@ -712,6 +716,8 @@ func TestHighNodeUtilizationWithTaints(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
"policy/v1",
@@ -720,6 +726,7 @@ func TestHighNodeUtilizationWithTaints(t *testing.T) {
nil,
item.nodes,
false,
eventRecorder,
)
evictorFilter := evictions.NewEvictorFilter(

View File

@@ -29,6 +29,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -765,6 +766,8 @@ func TestLowNodeUtilization(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -773,6 +776,7 @@ func TestLowNodeUtilization(t *testing.T) {
nil,
test.nodes,
false,
eventRecorder,
)
strategy := api.DeschedulerStrategy{
@@ -1086,6 +1090,8 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -1094,6 +1100,7 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
nil,
item.nodes,
false,
eventRecorder,
)
evictorFilter := evictions.NewEvictorFilter(

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -211,6 +212,8 @@ func TestPodAntiAffinity(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -219,6 +222,7 @@ func TestPodAntiAffinity(t *testing.T) {
test.maxNoOfPodsToEvictPerNamespace,
test.nodes,
false,
eventRecorder,
)
strategy := api.DeschedulerStrategy{
Params: &api.StrategyParameters{

View File

@@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -389,6 +390,8 @@ func TestPodLifeTime(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -397,6 +400,7 @@ func TestPodLifeTime(t *testing.T) {
tc.maxPodsToEvictPerNamespace,
tc.nodes,
false,
eventRecorder,
)
evictorFilter := evictions.NewEvictorFilter(

View File

@@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -243,6 +244,8 @@ func TestRemovePodsHavingTooManyRestarts(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -251,6 +254,7 @@ func TestRemovePodsHavingTooManyRestarts(t *testing.T) {
tc.maxNoOfPodsToEvictPerNamespace,
tc.nodes,
false,
eventRecorder,
)
evictorFilter := evictions.NewEvictorFilter(

View File

@@ -13,6 +13,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -1215,6 +1216,8 @@ func TestTopologySpreadConstraint(t *testing.T) {
return false, nil, nil // fallback to the default reactor
})
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
"v1",
@@ -1223,6 +1226,7 @@ func TestTopologySpreadConstraint(t *testing.T) {
nil,
tc.nodes,
false,
eventRecorder,
)
nodeFit := false

View File

@@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/descheduler/pkg/apis/componentconfig"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
@@ -349,6 +350,8 @@ func TestDeletePodsViolatingNodeTaints(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
eventRecorder := &events.FakeRecorder{}
podEvictor := evictions.NewPodEvictor(
fakeClient,
policyv1.SchemeGroupVersion.String(),
@@ -357,6 +360,7 @@ func TestDeletePodsViolatingNodeTaints(t *testing.T) {
tc.maxNoOfPodsToEvictPerNamespace,
tc.nodes,
false,
eventRecorder,
)
handle := &frameworkfake.HandleImpl{

15
pkg/utils/events.go Normal file
View File

@@ -0,0 +1,15 @@
package utils
import (
"context"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/events"
)
func GetRecorderAndBroadcaster(ctx context.Context, clientset clientset.Interface) (events.EventBroadcasterAdapter, events.EventRecorder) {
eventBroadcaster := events.NewEventBroadcasterAdapter(clientset)
eventBroadcaster.StartRecordingToSink(ctx.Done())
eventRecorder := eventBroadcaster.NewRecorder("sigs.k8s.io.descheduler")
return eventBroadcaster, eventRecorder
}