1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

e2e: TestLowNodeUtilization: normalize nodes before running the strategy

This commit is contained in:
Jan Chaloupka
2021-03-28 21:36:19 +02:00
parent 84b174e841
commit 079bd6157b
2 changed files with 403 additions and 7 deletions

View File

@@ -18,6 +18,7 @@ package e2e
import (
"context"
"fmt"
"math"
"os"
"sort"
@@ -30,11 +31,13 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
v1qos "k8s.io/kubectl/pkg/util/qos"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
@@ -46,6 +49,7 @@ import (
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/descheduler/strategies"
"sigs.k8s.io/descheduler/pkg/utils"
)
func MakePodSpec(priorityClassName string, gracePeriod *int64) v1.PodSpec {
@@ -231,7 +235,7 @@ func intersectStrings(lista, listb []string) []string {
func TestLowNodeUtilization(t *testing.T) {
ctx := context.Background()
clientSet, nodeInformer, stopCh := initializeClient(t)
clientSet, _, stopCh := initializeClient(t)
defer close(stopCh)
nodeList, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
@@ -240,24 +244,148 @@ func TestLowNodeUtilization(t *testing.T) {
}
var nodes []*v1.Node
var workerNodes []*v1.Node
for i := range nodeList.Items {
node := nodeList.Items[i]
nodes = append(nodes, &node)
if _, exists := node.Labels["node-role.kubernetes.io/master"]; !exists {
workerNodes = append(workerNodes, &node)
}
}
t.Log("Creating testing namespace")
testNamespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "e2e-" + strings.ToLower(t.Name())}}
if _, err := clientSet.CoreV1().Namespaces().Create(ctx, testNamespace, metav1.CreateOptions{}); err != nil {
t.Fatalf("Unable to create ns %v", testNamespace.Name)
}
defer clientSet.CoreV1().Namespaces().Delete(ctx, testNamespace.Name, metav1.DeleteOptions{})
rc := RcByNameContainer("test-rc-node-utilization", testNamespace.Name, int32(15), map[string]string{"test": "node-utilization"}, nil, "")
if _, err := clientSet.CoreV1().ReplicationControllers(rc.Namespace).Create(ctx, rc, metav1.CreateOptions{}); err != nil {
t.Errorf("Error creating deployment %v", err)
// Make all worker nodes resource balanced
cleanUp, err := createBalancedPodForNodes(t, ctx, clientSet, testNamespace.Name, workerNodes, 0.5)
if err != nil {
t.Fatalf("Unable to create load balancing pods: %v", err)
}
defer cleanUp()
t.Log("Creating pods each consuming 10% of node's allocatable")
nodeCpu := workerNodes[0].Status.Allocatable[v1.ResourceCPU]
tenthOfCpu := int64(float64((&nodeCpu).MilliValue()) * 0.1)
t.Log("Creating pods all bound to a single node")
for i := 0; i < 4; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("lnu-pod-%v", i),
Namespace: testNamespace.Name,
Labels: map[string]string{"test": "node-utilization", "name": "test-rc-node-utilization"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "pause",
ImagePullPolicy: "Never",
Image: "kubernetes/pause",
Ports: []v1.ContainerPort{{ContainerPort: 80}},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(tenthOfCpu, resource.DecimalSI),
},
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(tenthOfCpu, resource.DecimalSI),
},
},
}},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchFields: []v1.NodeSelectorRequirement{
{Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{workerNodes[0].Name}},
},
},
},
},
},
},
},
}
t.Logf("Creating pod %v in %v namespace for node %v", pod.Name, pod.Namespace, workerNodes[0].Name)
_, err := clientSet.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
t.Logf("Error creating LNU pods: %v", err)
if err = clientSet.CoreV1().Pods(pod.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{"test": "node-utilization", "name": "test-rc-node-utilization"})).String(),
}); err != nil {
t.Fatalf("Unable to delete LNU pods: %v", err)
}
return
}
}
evictPods(ctx, t, clientSet, nodeInformer, nodes, rc)
deleteRC(ctx, t, clientSet, rc)
t.Log("Creating RC with 4 replicas owning the created pods")
rc := RcByNameContainer("test-rc-node-utilization", testNamespace.Name, int32(4), map[string]string{"test": "node-utilization"}, nil, "")
if _, err := clientSet.CoreV1().ReplicationControllers(rc.Namespace).Create(ctx, rc, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating RC %v", err)
}
defer deleteRC(ctx, t, clientSet, rc)
waitForRCPodsRunning(ctx, t, clientSet, rc)
// Run LowNodeUtilization strategy
evictionPolicyGroupVersion, err := eutils.SupportEviction(clientSet)
if err != nil || len(evictionPolicyGroupVersion) == 0 {
klog.Fatalf("%v", err)
}
podEvictor := evictions.NewPodEvictor(
clientSet,
evictionPolicyGroupVersion,
false,
0,
nodes,
true,
false,
false,
)
podsOnMosttUtilizedNode, err := podutil.ListPodsOnANode(ctx, clientSet, workerNodes[0], podutil.WithFilter(podEvictor.Evictable().IsEvictable))
if err != nil {
t.Errorf("Error listing pods on a node %v", err)
}
podsBefore := len(podsOnMosttUtilizedNode)
t.Log("Running LowNodeUtilization strategy")
strategies.LowNodeUtilization(
ctx,
clientSet,
deschedulerapi.DeschedulerStrategy{
Enabled: true,
Params: &deschedulerapi.StrategyParameters{
NodeResourceUtilizationThresholds: &deschedulerapi.NodeResourceUtilizationThresholds{
Thresholds: deschedulerapi.ResourceThresholds{
v1.ResourceCPU: 70,
},
TargetThresholds: deschedulerapi.ResourceThresholds{
v1.ResourceCPU: 80,
},
},
},
},
workerNodes,
podEvictor,
)
waitForTerminatingPodsToDisappear(ctx, t, clientSet, rc.Namespace)
podsOnMosttUtilizedNode, err = podutil.ListPodsOnANode(ctx, clientSet, workerNodes[0], podutil.WithFilter(podEvictor.Evictable().IsEvictable))
if err != nil {
t.Errorf("Error listing pods on a node %v", err)
}
podsAfter := len(podsOnMosttUtilizedNode)
if podsAfter >= podsBefore {
t.Fatalf("No pod has been evicted from %v node", workerNodes[0].Name)
}
t.Logf("Number of pods on node %v changed from %v to %v", workerNodes[0].Name, podsBefore, podsAfter)
}
// TODO(jchaloup): add testcases for two included/excluded namespaces
@@ -843,6 +971,48 @@ func TestDeschedulingInterval(t *testing.T) {
}
}
func waitForRCPodsRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController) {
if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
podList, err := clientSet.CoreV1().Pods(rc.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(rc.Spec.Template.ObjectMeta.Labels)).String(),
})
if err != nil {
return false, err
}
if len(podList.Items) != int(*rc.Spec.Replicas) {
t.Logf("Waiting for %v pods to be created, got %v instead", *rc.Spec.Replicas, len(podList.Items))
return false, nil
}
for _, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
t.Logf("Pod %v not running yet, is %v instead", pod.Name, pod.Status.Phase)
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("Error waiting for pods running: %v", err)
}
}
func waitForTerminatingPodsToDisappear(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
podList, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return false, err
}
for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
t.Logf("Pod %v still terminating", pod.Name)
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("Error waiting for terminating pods to disappear: %v", err)
}
}
func deleteRC(ctx context.Context, t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController) {
//set number of replicas to 0
rcdeepcopy := rc.DeepCopy()
@@ -935,3 +1105,229 @@ func evictPods(ctx context.Context, t *testing.T, clientSet clientset.Interface,
t.Fatalf("We should have see more pods on this node as per kubeadm's way of installing %v, %v", podsBefore, podsAfter)
}
}
var balancePodLabel = map[string]string{"podname": "priority-balanced-memory"}
// track min memory limit based on crio minimum. pods cannot set a limit lower than this
// see: https://github.com/cri-o/cri-o/blob/29805b13e9a43d9d22628553db337ce1c1bec0a8/internal/config/cgmgr/cgmgr.go#L23
// see: https://bugzilla.redhat.com/show_bug.cgi?id=1595256
var crioMinMemLimit = 12 * 1024 * 1024
var podRequestedResource = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceCPU: resource.MustParse("100m"),
},
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceCPU: resource.MustParse("100m"),
},
}
// createBalancedPodForNodes creates a pod per node that asks for enough resources to make all nodes have the same mem/cpu usage ratio.
// TODO(jchaloup): The function is updated version of what under https://github.com/kubernetes/kubernetes/blob/84483a5/test/e2e/scheduling/priorities.go#L478.
// Import it once the function is moved under k8s.io/components-helper repo and modified to work for both priority and predicates cases.
func createBalancedPodForNodes(
t *testing.T,
ctx context.Context,
cs clientset.Interface,
ns string,
nodes []*v1.Node,
ratio float64,
) (func(), error) {
cleanUp := func() {
// Delete all remaining pods
err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
})
if err != nil {
t.Logf("Failed to delete memory balanced pods: %v.", err)
} else {
err := wait.PollImmediate(2*time.Second, time.Minute, func() (bool, error) {
podList, err := cs.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
})
if err != nil {
t.Logf("Failed to list memory balanced pods: %v.", err)
return false, nil
}
if len(podList.Items) > 0 {
return false, nil
}
return true, nil
})
if err != nil {
t.Logf("Failed to wait until all memory balanced pods are deleted: %v.", err)
}
}
}
// find the max, if the node has the max,use the one, if not,use the ratio parameter
var maxCPUFraction, maxMemFraction float64 = ratio, ratio
var cpuFractionMap = make(map[string]float64)
var memFractionMap = make(map[string]float64)
for _, node := range nodes {
cpuFraction, memFraction, _, _ := computeCPUMemFraction(t, cs, node, podRequestedResource)
cpuFractionMap[node.Name] = cpuFraction
memFractionMap[node.Name] = memFraction
if cpuFraction > maxCPUFraction {
maxCPUFraction = cpuFraction
}
if memFraction > maxMemFraction {
maxMemFraction = memFraction
}
}
// we need the max one to keep the same cpu/mem use rate
ratio = math.Max(maxCPUFraction, maxMemFraction)
for _, node := range nodes {
memAllocatable, found := node.Status.Allocatable[v1.ResourceMemory]
if !found {
t.Fatalf("Failed to get node's Allocatable[v1.ResourceMemory]")
}
memAllocatableVal := memAllocatable.Value()
cpuAllocatable, found := node.Status.Allocatable[v1.ResourceCPU]
if !found {
t.Fatalf("Failed to get node's Allocatable[v1.ResourceCPU]")
}
cpuAllocatableMil := cpuAllocatable.MilliValue()
needCreateResource := v1.ResourceList{}
cpuFraction := cpuFractionMap[node.Name]
memFraction := memFractionMap[node.Name]
needCreateResource[v1.ResourceCPU] = *resource.NewMilliQuantity(int64((ratio-cpuFraction)*float64(cpuAllocatableMil)), resource.DecimalSI)
// add crioMinMemLimit to ensure that all pods are setting at least that much for a limit, while keeping the same ratios
needCreateResource[v1.ResourceMemory] = *resource.NewQuantity(int64((ratio-memFraction)*float64(memAllocatableVal)+float64(crioMinMemLimit)), resource.BinarySI)
var gracePeriod = int64(1)
// Don't set OwnerReferences to avoid pod eviction
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "filler-pod-" + string(uuid.NewUUID()),
Namespace: ns,
Labels: balancePodLabel,
},
Spec: v1.PodSpec{
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchFields: []v1.NodeSelectorRequirement{
{Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
},
},
},
},
},
},
Containers: []v1.Container{
{
Name: "pause",
Image: "kubernetes/pause",
Resources: v1.ResourceRequirements{
Limits: needCreateResource,
Requests: needCreateResource,
},
},
},
// PriorityClassName: conf.PriorityClassName,
TerminationGracePeriodSeconds: &gracePeriod,
},
}
t.Logf("Creating pod %v in %v namespace for node %v", pod.Name, pod.Namespace, node.Name)
_, err := cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
t.Logf("Error creating filler pod: %v", err)
return cleanUp, err
}
waitForPodRunning(ctx, t, cs, pod)
}
for _, node := range nodes {
t.Log("Compute Cpu, Mem Fraction after create balanced pods.")
computeCPUMemFraction(t, cs, node, podRequestedResource)
}
return cleanUp, nil
}
func computeCPUMemFraction(t *testing.T, cs clientset.Interface, node *v1.Node, resourceReq *v1.ResourceRequirements) (float64, float64, int64, int64) {
t.Logf("ComputeCPUMemFraction for node: %v", node.Name)
totalRequestedCPUResource := resourceReq.Requests.Cpu().MilliValue()
totalRequestedMemResource := resourceReq.Requests.Memory().Value()
allpods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("Expect error of invalid, got : %v", err)
}
for _, pod := range allpods.Items {
if pod.Spec.NodeName == node.Name {
req, _ := utils.PodRequestsAndLimits(&pod)
if _, ok := req[v1.ResourceCPU]; !ok {
req[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI)
}
if _, ok := req[v1.ResourceMemory]; !ok {
req[v1.ResourceMemory] = *resource.NewQuantity(0, resource.BinarySI)
}
cpuRequest := req[v1.ResourceCPU]
memoryRequest := req[v1.ResourceMemory]
t.Logf("Pod for on the node: %v, Cpu: %v, Mem: %v", pod.Name, (&cpuRequest).MilliValue(), (&memoryRequest).Value())
// Ignore best effort pods while computing fractions as they won't be taken in account by scheduler.
if v1qos.GetPodQOS(&pod) == v1.PodQOSBestEffort {
continue
}
totalRequestedCPUResource += (&cpuRequest).MilliValue()
totalRequestedMemResource += (&memoryRequest).Value()
}
}
cpuAllocatable, found := node.Status.Allocatable[v1.ResourceCPU]
if !found {
t.Fatalf("Failed to get node's Allocatable[v1.ResourceCPU]")
}
cpuAllocatableMil := cpuAllocatable.MilliValue()
floatOne := float64(1)
cpuFraction := float64(totalRequestedCPUResource) / float64(cpuAllocatableMil)
if cpuFraction > floatOne {
cpuFraction = floatOne
}
memAllocatable, found := node.Status.Allocatable[v1.ResourceMemory]
if !found {
t.Fatalf("Failed to get node's Allocatable[v1.ResourceMemory]")
}
memAllocatableVal := memAllocatable.Value()
memFraction := float64(totalRequestedMemResource) / float64(memAllocatableVal)
if memFraction > floatOne {
memFraction = floatOne
}
t.Logf("Node: %v, totalRequestedCPUResource: %v, cpuAllocatableMil: %v, cpuFraction: %v", node.Name, totalRequestedCPUResource, cpuAllocatableMil, cpuFraction)
t.Logf("Node: %v, totalRequestedMemResource: %v, memAllocatableVal: %v, memFraction: %v", node.Name, totalRequestedMemResource, memAllocatableVal, memFraction)
return cpuFraction, memFraction, cpuAllocatableMil, memAllocatableVal
}
func waitForPodRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, pod *v1.Pod) {
if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
podItem, err := clientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if podItem.Status.Phase != v1.PodRunning {
t.Logf("Pod %v not running yet, is %v instead", podItem.Name, podItem.Status.Phase)
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("Error waiting for pod running: %v", err)
}
}

View File

@@ -19,7 +19,7 @@ package test
import (
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)