From 079bd6157b5d14ef2c63694700b8e58dfdcc84d6 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Sun, 28 Mar 2021 21:36:19 +0200 Subject: [PATCH] e2e: TestLowNodeUtilization: normalize nodes before running the strategy --- test/e2e/e2e_test.go | 408 ++++++++++++++++++++++++++++++++++++++++++- test/test_utils.go | 2 +- 2 files changed, 403 insertions(+), 7 deletions(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index aaa65c324..f38384452 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -18,6 +18,7 @@ package e2e import ( "context" + "fmt" "math" "os" "sort" @@ -30,11 +31,13 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + v1qos "k8s.io/kubectl/pkg/util/qos" "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/pkg/api" @@ -46,6 +49,7 @@ import ( nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/descheduler/strategies" + "sigs.k8s.io/descheduler/pkg/utils" ) func MakePodSpec(priorityClassName string, gracePeriod *int64) v1.PodSpec { @@ -231,7 +235,7 @@ func intersectStrings(lista, listb []string) []string { func TestLowNodeUtilization(t *testing.T) { ctx := context.Background() - clientSet, nodeInformer, stopCh := initializeClient(t) + clientSet, _, stopCh := initializeClient(t) defer close(stopCh) nodeList, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) @@ -240,24 +244,148 @@ func TestLowNodeUtilization(t *testing.T) { } var nodes []*v1.Node + var workerNodes []*v1.Node for i := range nodeList.Items { node := nodeList.Items[i] nodes = append(nodes, &node) + if _, exists := node.Labels["node-role.kubernetes.io/master"]; !exists { + workerNodes = append(workerNodes, &node) + } } + t.Log("Creating testing namespace") testNamespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "e2e-" + strings.ToLower(t.Name())}} if _, err := clientSet.CoreV1().Namespaces().Create(ctx, testNamespace, metav1.CreateOptions{}); err != nil { t.Fatalf("Unable to create ns %v", testNamespace.Name) } defer clientSet.CoreV1().Namespaces().Delete(ctx, testNamespace.Name, metav1.DeleteOptions{}) - rc := RcByNameContainer("test-rc-node-utilization", testNamespace.Name, int32(15), map[string]string{"test": "node-utilization"}, nil, "") - if _, err := clientSet.CoreV1().ReplicationControllers(rc.Namespace).Create(ctx, rc, metav1.CreateOptions{}); err != nil { - t.Errorf("Error creating deployment %v", err) + // Make all worker nodes resource balanced + cleanUp, err := createBalancedPodForNodes(t, ctx, clientSet, testNamespace.Name, workerNodes, 0.5) + if err != nil { + t.Fatalf("Unable to create load balancing pods: %v", err) + } + defer cleanUp() + + t.Log("Creating pods each consuming 10% of node's allocatable") + nodeCpu := workerNodes[0].Status.Allocatable[v1.ResourceCPU] + tenthOfCpu := int64(float64((&nodeCpu).MilliValue()) * 0.1) + + t.Log("Creating pods all bound to a single node") + for i := 0; i < 4; i++ { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("lnu-pod-%v", i), + Namespace: testNamespace.Name, + Labels: map[string]string{"test": "node-utilization", "name": "test-rc-node-utilization"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "pause", + ImagePullPolicy: "Never", + Image: "kubernetes/pause", + Ports: []v1.ContainerPort{{ContainerPort: 80}}, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(tenthOfCpu, resource.DecimalSI), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(tenthOfCpu, resource.DecimalSI), + }, + }, + }}, + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{workerNodes[0].Name}}, + }, + }, + }, + }, + }, + }, + }, + } + + t.Logf("Creating pod %v in %v namespace for node %v", pod.Name, pod.Namespace, workerNodes[0].Name) + _, err := clientSet.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Logf("Error creating LNU pods: %v", err) + if err = clientSet.CoreV1().Pods(pod.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{"test": "node-utilization", "name": "test-rc-node-utilization"})).String(), + }); err != nil { + t.Fatalf("Unable to delete LNU pods: %v", err) + } + return + } } - evictPods(ctx, t, clientSet, nodeInformer, nodes, rc) - deleteRC(ctx, t, clientSet, rc) + t.Log("Creating RC with 4 replicas owning the created pods") + rc := RcByNameContainer("test-rc-node-utilization", testNamespace.Name, int32(4), map[string]string{"test": "node-utilization"}, nil, "") + if _, err := clientSet.CoreV1().ReplicationControllers(rc.Namespace).Create(ctx, rc, metav1.CreateOptions{}); err != nil { + t.Fatalf("Error creating RC %v", err) + } + defer deleteRC(ctx, t, clientSet, rc) + waitForRCPodsRunning(ctx, t, clientSet, rc) + + // Run LowNodeUtilization strategy + evictionPolicyGroupVersion, err := eutils.SupportEviction(clientSet) + if err != nil || len(evictionPolicyGroupVersion) == 0 { + klog.Fatalf("%v", err) + } + podEvictor := evictions.NewPodEvictor( + clientSet, + evictionPolicyGroupVersion, + false, + 0, + nodes, + true, + false, + false, + ) + + podsOnMosttUtilizedNode, err := podutil.ListPodsOnANode(ctx, clientSet, workerNodes[0], podutil.WithFilter(podEvictor.Evictable().IsEvictable)) + if err != nil { + t.Errorf("Error listing pods on a node %v", err) + } + podsBefore := len(podsOnMosttUtilizedNode) + + t.Log("Running LowNodeUtilization strategy") + strategies.LowNodeUtilization( + ctx, + clientSet, + deschedulerapi.DeschedulerStrategy{ + Enabled: true, + Params: &deschedulerapi.StrategyParameters{ + NodeResourceUtilizationThresholds: &deschedulerapi.NodeResourceUtilizationThresholds{ + Thresholds: deschedulerapi.ResourceThresholds{ + v1.ResourceCPU: 70, + }, + TargetThresholds: deschedulerapi.ResourceThresholds{ + v1.ResourceCPU: 80, + }, + }, + }, + }, + workerNodes, + podEvictor, + ) + + waitForTerminatingPodsToDisappear(ctx, t, clientSet, rc.Namespace) + + podsOnMosttUtilizedNode, err = podutil.ListPodsOnANode(ctx, clientSet, workerNodes[0], podutil.WithFilter(podEvictor.Evictable().IsEvictable)) + if err != nil { + t.Errorf("Error listing pods on a node %v", err) + } + podsAfter := len(podsOnMosttUtilizedNode) + + if podsAfter >= podsBefore { + t.Fatalf("No pod has been evicted from %v node", workerNodes[0].Name) + } + t.Logf("Number of pods on node %v changed from %v to %v", workerNodes[0].Name, podsBefore, podsAfter) } // TODO(jchaloup): add testcases for two included/excluded namespaces @@ -843,6 +971,48 @@ func TestDeschedulingInterval(t *testing.T) { } } +func waitForRCPodsRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController) { + if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { + podList, err := clientSet.CoreV1().Pods(rc.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set(rc.Spec.Template.ObjectMeta.Labels)).String(), + }) + if err != nil { + return false, err + } + if len(podList.Items) != int(*rc.Spec.Replicas) { + t.Logf("Waiting for %v pods to be created, got %v instead", *rc.Spec.Replicas, len(podList.Items)) + return false, nil + } + for _, pod := range podList.Items { + if pod.Status.Phase != v1.PodRunning { + t.Logf("Pod %v not running yet, is %v instead", pod.Name, pod.Status.Phase) + return false, nil + } + } + return true, nil + }); err != nil { + t.Fatalf("Error waiting for pods running: %v", err) + } +} + +func waitForTerminatingPodsToDisappear(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { + if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { + podList, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, pod := range podList.Items { + if pod.DeletionTimestamp != nil { + t.Logf("Pod %v still terminating", pod.Name) + return false, nil + } + } + return true, nil + }); err != nil { + t.Fatalf("Error waiting for terminating pods to disappear: %v", err) + } +} + func deleteRC(ctx context.Context, t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController) { //set number of replicas to 0 rcdeepcopy := rc.DeepCopy() @@ -935,3 +1105,229 @@ func evictPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, t.Fatalf("We should have see more pods on this node as per kubeadm's way of installing %v, %v", podsBefore, podsAfter) } } + +var balancePodLabel = map[string]string{"podname": "priority-balanced-memory"} + +// track min memory limit based on crio minimum. pods cannot set a limit lower than this +// see: https://github.com/cri-o/cri-o/blob/29805b13e9a43d9d22628553db337ce1c1bec0a8/internal/config/cgmgr/cgmgr.go#L23 +// see: https://bugzilla.redhat.com/show_bug.cgi?id=1595256 +var crioMinMemLimit = 12 * 1024 * 1024 + +var podRequestedResource = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourceCPU: resource.MustParse("100m"), + }, + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourceCPU: resource.MustParse("100m"), + }, +} + +// createBalancedPodForNodes creates a pod per node that asks for enough resources to make all nodes have the same mem/cpu usage ratio. +// TODO(jchaloup): The function is updated version of what under https://github.com/kubernetes/kubernetes/blob/84483a5/test/e2e/scheduling/priorities.go#L478. +// Import it once the function is moved under k8s.io/components-helper repo and modified to work for both priority and predicates cases. +func createBalancedPodForNodes( + t *testing.T, + ctx context.Context, + cs clientset.Interface, + ns string, + nodes []*v1.Node, + ratio float64, +) (func(), error) { + cleanUp := func() { + // Delete all remaining pods + err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(), + }) + if err != nil { + t.Logf("Failed to delete memory balanced pods: %v.", err) + } else { + err := wait.PollImmediate(2*time.Second, time.Minute, func() (bool, error) { + podList, err := cs.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(), + }) + if err != nil { + t.Logf("Failed to list memory balanced pods: %v.", err) + return false, nil + } + if len(podList.Items) > 0 { + return false, nil + } + return true, nil + }) + if err != nil { + t.Logf("Failed to wait until all memory balanced pods are deleted: %v.", err) + } + } + } + + // find the max, if the node has the max,use the one, if not,use the ratio parameter + var maxCPUFraction, maxMemFraction float64 = ratio, ratio + var cpuFractionMap = make(map[string]float64) + var memFractionMap = make(map[string]float64) + + for _, node := range nodes { + cpuFraction, memFraction, _, _ := computeCPUMemFraction(t, cs, node, podRequestedResource) + cpuFractionMap[node.Name] = cpuFraction + memFractionMap[node.Name] = memFraction + if cpuFraction > maxCPUFraction { + maxCPUFraction = cpuFraction + } + if memFraction > maxMemFraction { + maxMemFraction = memFraction + } + } + + // we need the max one to keep the same cpu/mem use rate + ratio = math.Max(maxCPUFraction, maxMemFraction) + for _, node := range nodes { + memAllocatable, found := node.Status.Allocatable[v1.ResourceMemory] + if !found { + t.Fatalf("Failed to get node's Allocatable[v1.ResourceMemory]") + } + memAllocatableVal := memAllocatable.Value() + + cpuAllocatable, found := node.Status.Allocatable[v1.ResourceCPU] + if !found { + t.Fatalf("Failed to get node's Allocatable[v1.ResourceCPU]") + } + cpuAllocatableMil := cpuAllocatable.MilliValue() + + needCreateResource := v1.ResourceList{} + cpuFraction := cpuFractionMap[node.Name] + memFraction := memFractionMap[node.Name] + needCreateResource[v1.ResourceCPU] = *resource.NewMilliQuantity(int64((ratio-cpuFraction)*float64(cpuAllocatableMil)), resource.DecimalSI) + + // add crioMinMemLimit to ensure that all pods are setting at least that much for a limit, while keeping the same ratios + needCreateResource[v1.ResourceMemory] = *resource.NewQuantity(int64((ratio-memFraction)*float64(memAllocatableVal)+float64(crioMinMemLimit)), resource.BinarySI) + + var gracePeriod = int64(1) + // Don't set OwnerReferences to avoid pod eviction + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "filler-pod-" + string(uuid.NewUUID()), + Namespace: ns, + Labels: balancePodLabel, + }, + Spec: v1.PodSpec{ + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}}, + }, + }, + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "pause", + Image: "kubernetes/pause", + Resources: v1.ResourceRequirements{ + Limits: needCreateResource, + Requests: needCreateResource, + }, + }, + }, + // PriorityClassName: conf.PriorityClassName, + TerminationGracePeriodSeconds: &gracePeriod, + }, + } + + t.Logf("Creating pod %v in %v namespace for node %v", pod.Name, pod.Namespace, node.Name) + _, err := cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Logf("Error creating filler pod: %v", err) + return cleanUp, err + } + + waitForPodRunning(ctx, t, cs, pod) + } + + for _, node := range nodes { + t.Log("Compute Cpu, Mem Fraction after create balanced pods.") + computeCPUMemFraction(t, cs, node, podRequestedResource) + } + + return cleanUp, nil +} + +func computeCPUMemFraction(t *testing.T, cs clientset.Interface, node *v1.Node, resourceReq *v1.ResourceRequirements) (float64, float64, int64, int64) { + t.Logf("ComputeCPUMemFraction for node: %v", node.Name) + totalRequestedCPUResource := resourceReq.Requests.Cpu().MilliValue() + totalRequestedMemResource := resourceReq.Requests.Memory().Value() + allpods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Expect error of invalid, got : %v", err) + } + for _, pod := range allpods.Items { + if pod.Spec.NodeName == node.Name { + req, _ := utils.PodRequestsAndLimits(&pod) + if _, ok := req[v1.ResourceCPU]; !ok { + req[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI) + } + if _, ok := req[v1.ResourceMemory]; !ok { + req[v1.ResourceMemory] = *resource.NewQuantity(0, resource.BinarySI) + } + + cpuRequest := req[v1.ResourceCPU] + memoryRequest := req[v1.ResourceMemory] + + t.Logf("Pod for on the node: %v, Cpu: %v, Mem: %v", pod.Name, (&cpuRequest).MilliValue(), (&memoryRequest).Value()) + // Ignore best effort pods while computing fractions as they won't be taken in account by scheduler. + if v1qos.GetPodQOS(&pod) == v1.PodQOSBestEffort { + continue + } + totalRequestedCPUResource += (&cpuRequest).MilliValue() + totalRequestedMemResource += (&memoryRequest).Value() + } + } + cpuAllocatable, found := node.Status.Allocatable[v1.ResourceCPU] + if !found { + t.Fatalf("Failed to get node's Allocatable[v1.ResourceCPU]") + } + cpuAllocatableMil := cpuAllocatable.MilliValue() + + floatOne := float64(1) + cpuFraction := float64(totalRequestedCPUResource) / float64(cpuAllocatableMil) + if cpuFraction > floatOne { + cpuFraction = floatOne + } + memAllocatable, found := node.Status.Allocatable[v1.ResourceMemory] + if !found { + t.Fatalf("Failed to get node's Allocatable[v1.ResourceMemory]") + } + memAllocatableVal := memAllocatable.Value() + memFraction := float64(totalRequestedMemResource) / float64(memAllocatableVal) + if memFraction > floatOne { + memFraction = floatOne + } + + t.Logf("Node: %v, totalRequestedCPUResource: %v, cpuAllocatableMil: %v, cpuFraction: %v", node.Name, totalRequestedCPUResource, cpuAllocatableMil, cpuFraction) + t.Logf("Node: %v, totalRequestedMemResource: %v, memAllocatableVal: %v, memFraction: %v", node.Name, totalRequestedMemResource, memAllocatableVal, memFraction) + + return cpuFraction, memFraction, cpuAllocatableMil, memAllocatableVal +} + +func waitForPodRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, pod *v1.Pod) { + if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { + podItem, err := clientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + if podItem.Status.Phase != v1.PodRunning { + t.Logf("Pod %v not running yet, is %v instead", podItem.Name, podItem.Status.Phase) + return false, nil + } + + return true, nil + }); err != nil { + t.Fatalf("Error waiting for pod running: %v", err) + } +} diff --git a/test/test_utils.go b/test/test_utils.go index 358a443cf..bacb52702 100644 --- a/test/test_utils.go +++ b/test/test_utils.go @@ -19,7 +19,7 @@ package test import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" )