mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 05:14:13 +01:00
Merge pull request #249 from ingvagabund/list-node-through-informer-in-every-iteration
List nodes through informer in every iteration
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/pkg/descheduler/client"
|
||||
@@ -46,28 +47,38 @@ func Run(rs *options.DeschedulerServer) error {
|
||||
return fmt.Errorf("deschedulerPolicy is nil")
|
||||
}
|
||||
|
||||
return RunDeschedulerStrategies(rs, deschedulerPolicy)
|
||||
}
|
||||
|
||||
func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy) error {
|
||||
evictionPolicyGroupVersion, err := eutils.SupportEviction(rs.Client)
|
||||
if err != nil || len(evictionPolicyGroupVersion) == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
stopChannel := make(chan struct{})
|
||||
nodes, err := nodeutil.ReadyNodes(rs.Client, rs.NodeSelector, stopChannel)
|
||||
return RunDeschedulerStrategies(rs, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel)
|
||||
}
|
||||
|
||||
func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, stopChannel chan struct{}) error {
|
||||
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
|
||||
nodeInformer := sharedInformerFactory.Core().V1().Nodes()
|
||||
|
||||
sharedInformerFactory.Start(stopChannel)
|
||||
sharedInformerFactory.WaitForCacheSync(stopChannel)
|
||||
|
||||
wait.Until(func() {
|
||||
nodes, err := nodeutil.ReadyNodes(rs.Client, nodeInformer, rs.NodeSelector, stopChannel)
|
||||
if err != nil {
|
||||
return err
|
||||
klog.V(1).Infof("Unable to get ready nodes: %v", err)
|
||||
close(stopChannel)
|
||||
return
|
||||
}
|
||||
|
||||
if len(nodes) <= 1 {
|
||||
klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
|
||||
return nil
|
||||
close(stopChannel)
|
||||
return
|
||||
}
|
||||
|
||||
nodePodCount := utils.InitializeNodePodCount(nodes)
|
||||
wait.Until(func() {
|
||||
|
||||
strategies.RemoveDuplicatePods(rs, deschedulerPolicy.Strategies["RemoveDuplicates"], evictionPolicyGroupVersion, nodes, nodePodCount)
|
||||
strategies.LowNodeUtilization(rs, deschedulerPolicy.Strategies["LowNodeUtilization"], evictionPolicyGroupVersion, nodes, nodePodCount)
|
||||
strategies.RemovePodsViolatingInterPodAntiAffinity(rs, deschedulerPolicy.Strategies["RemovePodsViolatingInterPodAntiAffinity"], evictionPolicyGroupVersion, nodes, nodePodCount)
|
||||
|
||||
92
pkg/descheduler/descheduler_test.go
Normal file
92
pkg/descheduler/descheduler_test.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package descheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
|
||||
"sigs.k8s.io/descheduler/pkg/api"
|
||||
"sigs.k8s.io/descheduler/test"
|
||||
)
|
||||
|
||||
func TestTaintsUpdated(t *testing.T) {
|
||||
n1 := test.BuildTestNode("n1", 2000, 3000, 10)
|
||||
n2 := test.BuildTestNode("n2", 2000, 3000, 10)
|
||||
|
||||
p1 := test.BuildTestPod(fmt.Sprintf("pod_1_%s", n1.Name), 200, 0, n1.Name)
|
||||
p1.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
|
||||
{},
|
||||
}
|
||||
|
||||
client := fakeclientset.NewSimpleClientset(n1, n2, p1)
|
||||
dp := &api.DeschedulerPolicy{
|
||||
Strategies: api.StrategyList{
|
||||
"RemovePodsViolatingNodeTaints": api.DeschedulerStrategy{
|
||||
Enabled: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
stopChannel := make(chan struct{})
|
||||
defer close(stopChannel)
|
||||
|
||||
rs := options.NewDeschedulerServer()
|
||||
rs.Client = client
|
||||
rs.DeschedulingInterval = 100 * time.Millisecond
|
||||
go func() {
|
||||
err := RunDeschedulerStrategies(rs, dp, "v1beta1", stopChannel)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to run descheduler strategies: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for few cycles and then verify the only pod still exists
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Unable to list pods: %v", err)
|
||||
}
|
||||
if len(pods.Items) < 1 {
|
||||
t.Errorf("The pod was evicted before a node was tained")
|
||||
}
|
||||
|
||||
n1WithTaint := n1.DeepCopy()
|
||||
n1WithTaint.Spec.Taints = []v1.Taint{
|
||||
{
|
||||
Key: "key",
|
||||
Value: "value",
|
||||
Effect: v1.TaintEffectNoSchedule,
|
||||
},
|
||||
}
|
||||
|
||||
if _, err := client.CoreV1().Nodes().Update(n1WithTaint); err != nil {
|
||||
t.Fatalf("Unable to update node: %v\n", err)
|
||||
}
|
||||
|
||||
if err := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
|
||||
// Get over evicted pod result in panic
|
||||
//pods, err := client.CoreV1().Pods(p1.Namespace).Get(p1.Name, metav1.GetOptions{})
|
||||
// List is better, it does not panic.
|
||||
// Though once the pod is evicted, List starts to error with "can't assign or convert v1beta1.Eviction into v1.Pod"
|
||||
pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{})
|
||||
if err == nil {
|
||||
if len(pods.Items) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
if strings.Contains(err.Error(), "can't assign or convert v1beta1.Eviction into v1.Pod") {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies")
|
||||
}
|
||||
}
|
||||
@@ -17,35 +17,28 @@ limitations under the License.
|
||||
package node
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog"
|
||||
"sigs.k8s.io/descheduler/pkg/utils"
|
||||
)
|
||||
|
||||
// ReadyNodes returns ready nodes irrespective of whether they are
|
||||
// schedulable or not.
|
||||
func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
|
||||
func ReadyNodes(client clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
|
||||
ns, err := labels.Parse(nodeSelector)
|
||||
if err != nil {
|
||||
return []*v1.Node{}, err
|
||||
}
|
||||
|
||||
var nodes []*v1.Node
|
||||
nl := GetNodeLister(client, stopChannel)
|
||||
if nl != nil {
|
||||
// err is defined above
|
||||
if nodes, err = nl.List(ns); err != nil {
|
||||
if nodes, err = nodeInformer.Lister().List(ns); err != nil {
|
||||
return []*v1.Node{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
klog.V(2).Infof("node lister returned empty list, now fetch directly")
|
||||
@@ -74,22 +67,6 @@ func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-c
|
||||
return readyNodes, nil
|
||||
}
|
||||
|
||||
func GetNodeLister(client clientset.Interface, stopChannel <-chan struct{}) corelisters.NodeLister {
|
||||
if stopChannel == nil {
|
||||
return nil
|
||||
}
|
||||
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything())
|
||||
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
nodeLister := corelisters.NewNodeLister(store)
|
||||
reflector := cache.NewReflector(listWatcher, &v1.Node{}, store, time.Hour)
|
||||
go reflector.Run(stopChannel)
|
||||
|
||||
// To give some time so that listing works, chosen randomly
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
return nodeLister
|
||||
}
|
||||
|
||||
// IsReady checks if the descheduler could run against given node.
|
||||
func IsReady(node *v1.Node) bool {
|
||||
for i := range node.Status.Conditions {
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"sigs.k8s.io/descheduler/test"
|
||||
)
|
||||
@@ -62,7 +63,16 @@ func TestReadyNodesWithNodeSelector(t *testing.T) {
|
||||
|
||||
fakeClient := fake.NewSimpleClientset(node1, node2)
|
||||
nodeSelector := "type=compute"
|
||||
nodes, _ := ReadyNodes(fakeClient, nodeSelector, nil)
|
||||
|
||||
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
nodeInformer := sharedInformerFactory.Core().V1().Nodes()
|
||||
|
||||
stopChannel := make(chan struct{}, 0)
|
||||
sharedInformerFactory.Start(stopChannel)
|
||||
sharedInformerFactory.WaitForCacheSync(stopChannel)
|
||||
defer close(stopChannel)
|
||||
|
||||
nodes, _ := ReadyNodes(fakeClient, nodeInformer, nodeSelector, nil)
|
||||
|
||||
if nodes[0].Name != "node1" {
|
||||
t.Errorf("Expected node1, got %s", nodes[0].Name)
|
||||
|
||||
@@ -24,6 +24,8 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog"
|
||||
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
|
||||
@@ -93,7 +95,7 @@ func RcByNameContainer(name string, replicas int32, labels map[string]string, gr
|
||||
}
|
||||
|
||||
// startEndToEndForLowNodeUtilization tests the lownode utilization strategy.
|
||||
func startEndToEndForLowNodeUtilization(clientset clientset.Interface) {
|
||||
func startEndToEndForLowNodeUtilization(clientset clientset.Interface, nodeInformer coreinformers.NodeInformer) {
|
||||
var thresholds = make(deschedulerapi.ResourceThresholds)
|
||||
var targetThresholds = make(deschedulerapi.ResourceThresholds)
|
||||
thresholds[v1.ResourceMemory] = 20
|
||||
@@ -108,7 +110,7 @@ func startEndToEndForLowNodeUtilization(clientset clientset.Interface) {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
stopChannel := make(chan struct{})
|
||||
nodes, err := nodeutil.ReadyNodes(clientset, "", stopChannel)
|
||||
nodes, err := nodeutil.ReadyNodes(clientset, nodeInformer, "", stopChannel)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
@@ -132,6 +134,14 @@ func TestE2E(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Error listing node with %v", err)
|
||||
}
|
||||
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0)
|
||||
nodeInformer := sharedInformerFactory.Core().V1().Nodes()
|
||||
|
||||
stopChannel := make(chan struct{}, 0)
|
||||
sharedInformerFactory.Start(stopChannel)
|
||||
sharedInformerFactory.WaitForCacheSync(stopChannel)
|
||||
defer close(stopChannel)
|
||||
|
||||
// Assumption: We would have 3 node cluster by now. Kubeadm brings all the master components onto master node.
|
||||
// So, the last node would have least utilization.
|
||||
rc := RcByNameContainer("test-rc", int32(15), map[string]string{"test": "app"}, nil)
|
||||
@@ -139,7 +149,7 @@ func TestE2E(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Error creating deployment %v", err)
|
||||
}
|
||||
evictPods(t, clientSet, nodeList, rc)
|
||||
evictPods(t, clientSet, nodeInformer, nodeList, rc)
|
||||
|
||||
rc.Spec.Template.Annotations = map[string]string{"descheduler.alpha.kubernetes.io/evict": "true"}
|
||||
rc.Spec.Replicas = func(i int32) *int32 { return &i }(15)
|
||||
@@ -156,7 +166,7 @@ func TestE2E(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Error creating deployment %v", err)
|
||||
}
|
||||
evictPods(t, clientSet, nodeList, rc)
|
||||
evictPods(t, clientSet, nodeInformer, nodeList, rc)
|
||||
}
|
||||
|
||||
func TestDeschedulingInterval(t *testing.T) {
|
||||
@@ -173,8 +183,13 @@ func TestDeschedulingInterval(t *testing.T) {
|
||||
|
||||
c := make(chan bool)
|
||||
go func() {
|
||||
err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy)
|
||||
if err != nil {
|
||||
evictionPolicyGroupVersion, err := eutils.SupportEviction(s.Client)
|
||||
if err != nil || len(evictionPolicyGroupVersion) == 0 {
|
||||
t.Errorf("Error when checking support for eviction: %v", err)
|
||||
}
|
||||
|
||||
stopChannel := make(chan struct{})
|
||||
if err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel); err != nil {
|
||||
t.Errorf("Error running descheduler strategies: %+v", err)
|
||||
}
|
||||
c <- true
|
||||
@@ -188,7 +203,7 @@ func TestDeschedulingInterval(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeList, rc *v1.ReplicationController) {
|
||||
func evictPods(t *testing.T, clientSet clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeList *v1.NodeList, rc *v1.ReplicationController) {
|
||||
var leastLoadedNode v1.Node
|
||||
podsBefore := math.MaxInt16
|
||||
for i := range nodeList.Items {
|
||||
@@ -208,7 +223,7 @@ func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeLis
|
||||
}
|
||||
}
|
||||
t.Log("Eviction of pods starting")
|
||||
startEndToEndForLowNodeUtilization(clientSet)
|
||||
startEndToEndForLowNodeUtilization(clientSet, nodeInformer)
|
||||
podsOnleastUtilizedNode, err := podutil.ListEvictablePodsOnNode(clientSet, &leastLoadedNode, true)
|
||||
if err != nil {
|
||||
t.Errorf("Error listing pods on a node %v", err)
|
||||
|
||||
Reference in New Issue
Block a user