mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 05:14:13 +01:00
Merge pull request #888 from knelasevero/fix-memory-leak-shutdown-broadcaster
fix: events memory leak. Using new events implementation and take recorder out of EvictPod
This commit is contained in:
@@ -18,6 +18,8 @@ limitations under the License.
|
||||
package options
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||
@@ -27,7 +29,6 @@ import (
|
||||
"sigs.k8s.io/descheduler/pkg/apis/componentconfig"
|
||||
"sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1"
|
||||
deschedulerscheme "sigs.k8s.io/descheduler/pkg/descheduler/scheme"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -39,6 +40,7 @@ type DeschedulerServer struct {
|
||||
componentconfig.DeschedulerConfiguration
|
||||
|
||||
Client clientset.Interface
|
||||
EventClient clientset.Interface
|
||||
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
|
||||
DisableMetrics bool
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: descheduler-cluster-role
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
- apiGroups: ["events.k8s.io"]
|
||||
resources: ["events"]
|
||||
verbs: ["create", "update"]
|
||||
- apiGroups: [""]
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
)
|
||||
|
||||
func CreateClient(kubeconfig string) (clientset.Interface, error) {
|
||||
func CreateClient(kubeconfig string, userAgt string) (clientset.Interface, error) {
|
||||
var cfg *rest.Config
|
||||
if len(kubeconfig) != 0 {
|
||||
master, err := GetMasterFromKubeconfig(kubeconfig)
|
||||
@@ -47,7 +47,11 @@ func CreateClient(kubeconfig string) (clientset.Interface, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return clientset.NewForConfig(cfg)
|
||||
if len(userAgt) != 0 {
|
||||
return clientset.NewForConfig(rest.AddUserAgent(cfg, userAgt))
|
||||
} else {
|
||||
return clientset.NewForConfig(cfg)
|
||||
}
|
||||
}
|
||||
|
||||
func GetMasterFromKubeconfig(filename string) (string, error) {
|
||||
|
||||
@@ -53,11 +53,12 @@ import (
|
||||
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
|
||||
metrics.Register()
|
||||
|
||||
rsclient, err := client.CreateClient(rs.KubeconfigFile)
|
||||
rsclient, eventClient, err := createClients(rs.KubeconfigFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rs.Client = rsclient
|
||||
rs.EventClient = eventClient
|
||||
|
||||
deschedulerPolicy, err := LoadPolicyConfig(rs.PolicyConfigFile)
|
||||
if err != nil {
|
||||
@@ -287,6 +288,16 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
ignorePvcPods = *deschedulerPolicy.IgnorePVCPods
|
||||
}
|
||||
|
||||
var eventClient clientset.Interface
|
||||
if rs.DryRun {
|
||||
eventClient = fakeclientset.NewSimpleClientset()
|
||||
} else {
|
||||
eventClient = rs.Client
|
||||
}
|
||||
|
||||
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient)
|
||||
defer eventBroadcaster.Shutdown()
|
||||
|
||||
wait.NonSlidingUntil(func() {
|
||||
nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeInformer, nodeSelector)
|
||||
if err != nil {
|
||||
@@ -340,6 +351,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace,
|
||||
nodes,
|
||||
!rs.DisableMetrics,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
for name, strategy := range deschedulerPolicy.Strategies {
|
||||
@@ -413,3 +425,17 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createClients(kubeconfig string) (clientset.Interface, clientset.Interface, error) {
|
||||
kClient, err := client.CreateClient(kubeconfig, "descheduler")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
eventClient, err := client.CreateClient(kubeconfig, "")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return kClient, eventClient, nil
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ func TestTaintsUpdated(t *testing.T) {
|
||||
}
|
||||
|
||||
client := fakeclientset.NewSimpleClientset(n1, n2, p1)
|
||||
eventClient := fakeclientset.NewSimpleClientset(n1, n2, p1)
|
||||
dp := &api.DeschedulerPolicy{
|
||||
Strategies: api.StrategyList{
|
||||
"RemovePodsViolatingNodeTaints": api.DeschedulerStrategy{
|
||||
@@ -40,6 +41,7 @@ func TestTaintsUpdated(t *testing.T) {
|
||||
t.Fatalf("Unable to initialize server: %v", err)
|
||||
}
|
||||
rs.Client = client
|
||||
rs.EventClient = eventClient
|
||||
rs.DeschedulingInterval = 100 * time.Millisecond
|
||||
errChan := make(chan error, 1)
|
||||
defer close(errChan)
|
||||
@@ -104,6 +106,7 @@ func TestRootCancel(t *testing.T) {
|
||||
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
|
||||
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
client := fakeclientset.NewSimpleClientset(n1, n2)
|
||||
eventClient := fakeclientset.NewSimpleClientset(n1, n2)
|
||||
dp := &api.DeschedulerPolicy{
|
||||
Strategies: api.StrategyList{}, // no strategies needed for this test
|
||||
}
|
||||
@@ -113,6 +116,7 @@ func TestRootCancel(t *testing.T) {
|
||||
t.Fatalf("Unable to initialize server: %v", err)
|
||||
}
|
||||
rs.Client = client
|
||||
rs.EventClient = eventClient
|
||||
rs.DeschedulingInterval = 100 * time.Millisecond
|
||||
errChan := make(chan error, 1)
|
||||
defer close(errChan)
|
||||
@@ -137,6 +141,7 @@ func TestRootCancelWithNoInterval(t *testing.T) {
|
||||
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
|
||||
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
|
||||
client := fakeclientset.NewSimpleClientset(n1, n2)
|
||||
eventClient := fakeclientset.NewSimpleClientset(n1, n2)
|
||||
dp := &api.DeschedulerPolicy{
|
||||
Strategies: api.StrategyList{}, // no strategies needed for this test
|
||||
}
|
||||
@@ -146,6 +151,7 @@ func TestRootCancelWithNoInterval(t *testing.T) {
|
||||
t.Fatalf("Unable to initialize server: %v", err)
|
||||
}
|
||||
rs.Client = client
|
||||
rs.EventClient = eventClient
|
||||
rs.DeschedulingInterval = 0
|
||||
errChan := make(chan error, 1)
|
||||
defer close(errChan)
|
||||
|
||||
@@ -27,9 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/errors"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/descheduler/metrics"
|
||||
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
|
||||
@@ -57,6 +55,7 @@ type PodEvictor struct {
|
||||
nodepodCount nodePodEvictedCount
|
||||
namespacePodCount namespacePodEvictCount
|
||||
metricsEnabled bool
|
||||
eventRecorder events.EventRecorder
|
||||
}
|
||||
|
||||
func NewPodEvictor(
|
||||
@@ -67,6 +66,7 @@ func NewPodEvictor(
|
||||
maxPodsToEvictPerNamespace *uint,
|
||||
nodes []*v1.Node,
|
||||
metricsEnabled bool,
|
||||
eventRecorder events.EventRecorder,
|
||||
) *PodEvictor {
|
||||
var nodePodCount = make(nodePodEvictedCount)
|
||||
var namespacePodCount = make(namespacePodEvictCount)
|
||||
@@ -85,6 +85,7 @@ func NewPodEvictor(
|
||||
nodepodCount: nodePodCount,
|
||||
namespacePodCount: namespacePodCount,
|
||||
metricsEnabled: metricsEnabled,
|
||||
eventRecorder: eventRecorder,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,11 +167,14 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio
|
||||
klog.V(1).InfoS("Evicted pod in dry run mode", "pod", klog.KObj(pod), "reason", opts.Reason, "strategy", strategy, "node", pod.Spec.NodeName)
|
||||
} else {
|
||||
klog.V(1).InfoS("Evicted pod", "pod", klog.KObj(pod), "reason", opts.Reason, "strategy", strategy, "node", pod.Spec.NodeName)
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartStructuredLogging(3)
|
||||
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: pe.client.CoreV1().Events(pod.Namespace)})
|
||||
r := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "sigs.k8s.io.descheduler"})
|
||||
r.Event(pod, v1.EventTypeNormal, "Descheduled", fmt.Sprintf("pod evicted by sigs.k8s.io/descheduler%s", opts.Reason))
|
||||
reason := opts.Reason
|
||||
if len(reason) == 0 {
|
||||
reason = strategy
|
||||
if len(reason) == 0 {
|
||||
reason = "NotSet"
|
||||
}
|
||||
}
|
||||
pe.eventRecorder.Eventf(pod, nil, v1.EventTypeNormal, reason, "Descheduled", "pod evicted by sigs.k8s.io/descheduler")
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -313,6 +314,8 @@ func TestFindDuplicatePods(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
"v1",
|
||||
@@ -321,6 +324,7 @@ func TestFindDuplicatePods(t *testing.T) {
|
||||
nil,
|
||||
testCase.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
nodeFit := false
|
||||
@@ -751,6 +755,8 @@ func TestRemoveDuplicatesUniformly(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -759,6 +765,7 @@ func TestRemoveDuplicatesUniformly(t *testing.T) {
|
||||
nil,
|
||||
testCase.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
evictorFilter := evictions.NewEvictorFilter(
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -268,6 +269,8 @@ func TestRemoveFailedPods(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -276,6 +279,7 @@ func TestRemoveFailedPods(t *testing.T) {
|
||||
nil,
|
||||
tc.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
evictorFilter := evictions.NewEvictorFilter(
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -215,6 +216,8 @@ func TestRemovePodsViolatingNodeAffinity(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -223,6 +226,7 @@ func TestRemovePodsViolatingNodeAffinity(t *testing.T) {
|
||||
tc.maxNoOfPodsToEvictPerNamespace,
|
||||
tc.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
nodeFit := false
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -500,6 +501,8 @@ func TestHighNodeUtilization(t *testing.T) {
|
||||
// return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
|
||||
//})
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
"v1",
|
||||
@@ -508,6 +511,7 @@ func TestHighNodeUtilization(t *testing.T) {
|
||||
nil,
|
||||
testCase.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
strategy := api.DeschedulerStrategy{
|
||||
@@ -712,6 +716,8 @@ func TestHighNodeUtilizationWithTaints(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
"policy/v1",
|
||||
@@ -720,6 +726,7 @@ func TestHighNodeUtilizationWithTaints(t *testing.T) {
|
||||
nil,
|
||||
item.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
evictorFilter := evictions.NewEvictorFilter(
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -765,6 +766,8 @@ func TestLowNodeUtilization(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -773,6 +776,7 @@ func TestLowNodeUtilization(t *testing.T) {
|
||||
nil,
|
||||
test.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
strategy := api.DeschedulerStrategy{
|
||||
@@ -1086,6 +1090,8 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -1094,6 +1100,7 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
|
||||
nil,
|
||||
item.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
evictorFilter := evictions.NewEvictorFilter(
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -211,6 +212,8 @@ func TestPodAntiAffinity(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -219,6 +222,7 @@ func TestPodAntiAffinity(t *testing.T) {
|
||||
test.maxNoOfPodsToEvictPerNamespace,
|
||||
test.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
strategy := api.DeschedulerStrategy{
|
||||
Params: &api.StrategyParameters{
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -389,6 +390,8 @@ func TestPodLifeTime(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -397,6 +400,7 @@ func TestPodLifeTime(t *testing.T) {
|
||||
tc.maxPodsToEvictPerNamespace,
|
||||
tc.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
evictorFilter := evictions.NewEvictorFilter(
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -243,6 +244,8 @@ func TestRemovePodsHavingTooManyRestarts(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -251,6 +254,7 @@ func TestRemovePodsHavingTooManyRestarts(t *testing.T) {
|
||||
tc.maxNoOfPodsToEvictPerNamespace,
|
||||
tc.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
evictorFilter := evictions.NewEvictorFilter(
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -1215,6 +1216,8 @@ func TestTopologySpreadConstraint(t *testing.T) {
|
||||
return false, nil, nil // fallback to the default reactor
|
||||
})
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
"v1",
|
||||
@@ -1223,6 +1226,7 @@ func TestTopologySpreadConstraint(t *testing.T) {
|
||||
nil,
|
||||
tc.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
nodeFit := false
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"sigs.k8s.io/descheduler/pkg/apis/componentconfig"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -349,6 +350,8 @@ func TestDeletePodsViolatingNodeTaints(t *testing.T) {
|
||||
sharedInformerFactory.Start(ctx.Done())
|
||||
sharedInformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
fakeClient,
|
||||
policyv1.SchemeGroupVersion.String(),
|
||||
@@ -357,6 +360,7 @@ func TestDeletePodsViolatingNodeTaints(t *testing.T) {
|
||||
tc.maxNoOfPodsToEvictPerNamespace,
|
||||
tc.nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
handle := &frameworkfake.HandleImpl{
|
||||
|
||||
15
pkg/utils/events.go
Normal file
15
pkg/utils/events.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/events"
|
||||
)
|
||||
|
||||
func GetRecorderAndBroadcaster(ctx context.Context, clientset clientset.Interface) (events.EventBroadcasterAdapter, events.EventRecorder) {
|
||||
eventBroadcaster := events.NewEventBroadcasterAdapter(clientset)
|
||||
eventBroadcaster.StartRecordingToSink(ctx.Done())
|
||||
eventRecorder := eventBroadcaster.NewRecorder("sigs.k8s.io.descheduler")
|
||||
return eventBroadcaster, eventRecorder
|
||||
}
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/utils/pointer"
|
||||
deschedulerapi "sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -138,6 +139,9 @@ func TestRemoveDuplicates(t *testing.T) {
|
||||
if err != nil || len(evictionPolicyGroupVersion) == 0 {
|
||||
t.Fatalf("Error creating eviction policy group %v", err)
|
||||
}
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
clientSet,
|
||||
evictionPolicyGroupVersion,
|
||||
@@ -146,6 +150,7 @@ func TestRemoveDuplicates(t *testing.T) {
|
||||
nil,
|
||||
nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
t.Log("Running DeschedulerStrategy strategy")
|
||||
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/events"
|
||||
v1qos "k8s.io/kubectl/pkg/util/qos"
|
||||
"k8s.io/utils/pointer"
|
||||
|
||||
@@ -107,7 +108,7 @@ func RcByNameContainer(name, namespace string, replicas int32, labels map[string
|
||||
}
|
||||
|
||||
func initializeClient(t *testing.T) (clientset.Interface, coreinformers.NodeInformer, podutil.GetPodsAssignedToNodeFunc, chan struct{}) {
|
||||
clientSet, err := client.CreateClient(os.Getenv("KUBECONFIG"))
|
||||
clientSet, err := client.CreateClient(os.Getenv("KUBECONFIG"), "")
|
||||
if err != nil {
|
||||
t.Errorf("Error during client creation with %v", err)
|
||||
}
|
||||
@@ -193,6 +194,8 @@ func runPodLifetimeStrategy(
|
||||
t.Fatalf("Failed to get threshold priority from strategy's params")
|
||||
}
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
strategies.PodLifeTime(
|
||||
ctx,
|
||||
clientset,
|
||||
@@ -206,6 +209,7 @@ func runPodLifetimeStrategy(
|
||||
maxPodsToEvictPerNamespace,
|
||||
nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
),
|
||||
evictions.NewEvictorFilter(
|
||||
nodes,
|
||||
@@ -1035,7 +1039,7 @@ func TestPodLifeTimeOldestEvicted(t *testing.T) {
|
||||
|
||||
func TestDeschedulingInterval(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
clientSet, err := client.CreateClient(os.Getenv("KUBECONFIG"))
|
||||
clientSet, err := client.CreateClient(os.Getenv("KUBECONFIG"), "")
|
||||
if err != nil {
|
||||
t.Errorf("Error during client creation with %v", err)
|
||||
}
|
||||
@@ -1424,6 +1428,9 @@ func initPodEvictorOrFail(t *testing.T, clientSet clientset.Interface, getPodsAs
|
||||
if err != nil || len(evictionPolicyGroupVersion) == 0 {
|
||||
t.Fatalf("Error creating eviction policy group: %v", err)
|
||||
}
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
return evictions.NewPodEvictor(
|
||||
clientSet,
|
||||
evictionPolicyGroupVersion,
|
||||
@@ -1432,5 +1439,6 @@ func initPodEvictorOrFail(t *testing.T, clientSet clientset.Interface, getPodsAs
|
||||
nil,
|
||||
nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/utils/pointer"
|
||||
deschedulerapi "sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
|
||||
@@ -131,6 +132,9 @@ func TestTooManyRestarts(t *testing.T) {
|
||||
if err != nil || len(evictionPolicyGroupVersion) == 0 {
|
||||
t.Fatalf("Error creating eviction policy group: %v", err)
|
||||
}
|
||||
|
||||
eventRecorder := &events.FakeRecorder{}
|
||||
|
||||
podEvictor := evictions.NewPodEvictor(
|
||||
clientSet,
|
||||
evictionPolicyGroupVersion,
|
||||
@@ -139,6 +143,7 @@ func TestTooManyRestarts(t *testing.T) {
|
||||
nil,
|
||||
nodes,
|
||||
false,
|
||||
eventRecorder,
|
||||
)
|
||||
|
||||
// Run RemovePodsHavingTooManyRestarts strategy
|
||||
|
||||
Reference in New Issue
Block a user