mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-26 05:14:13 +01:00
chore: comment the code and simplify some things
this commit comments the low and high utilization plugins. this also simplifies a little bit where it was possible without affecting too much.
This commit is contained in:
@@ -34,73 +34,94 @@ import (
|
||||
|
||||
const HighNodeUtilizationPluginName = "HighNodeUtilization"
|
||||
|
||||
// HighNodeUtilization evicts pods from under utilized nodes so that scheduler can schedule according to its plugin.
|
||||
// Note that CPU/Memory requests are used to calculate nodes' utilization and not the actual resource usage.
|
||||
|
||||
type HighNodeUtilization struct {
|
||||
handle frameworktypes.Handle
|
||||
args *HighNodeUtilizationArgs
|
||||
podFilter func(pod *v1.Pod) bool
|
||||
underutilizationCriteria []interface{}
|
||||
resourceNames []v1.ResourceName
|
||||
extendedResourceNames []v1.ResourceName
|
||||
targetThresholds api.ResourceThresholds
|
||||
usageClient usageClient
|
||||
}
|
||||
|
||||
// this lines makes sure that HighNodeUtilization implements the BalancePlugin
|
||||
// interface.
|
||||
var _ frameworktypes.BalancePlugin = &HighNodeUtilization{}
|
||||
|
||||
// NewHighNodeUtilization builds plugin from its arguments while passing a handle
|
||||
func NewHighNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
|
||||
highNodeUtilizatioArgs, ok := args.(*HighNodeUtilizationArgs)
|
||||
// HighNodeUtilization evicts pods from under utilized nodes so that scheduler
|
||||
// can schedule according to its plugin. Note that CPU/Memory requests are used
|
||||
// to calculate nodes' utilization and not the actual resource usage.
|
||||
type HighNodeUtilization struct {
|
||||
handle frameworktypes.Handle
|
||||
args *HighNodeUtilizationArgs
|
||||
podFilter func(pod *v1.Pod) bool
|
||||
criteria []any
|
||||
resourceNames []v1.ResourceName
|
||||
highThresholds api.ResourceThresholds
|
||||
usageClient usageClient
|
||||
}
|
||||
|
||||
// NewHighNodeUtilization builds plugin from its arguments while passing a handle.
|
||||
func NewHighNodeUtilization(
|
||||
genericArgs runtime.Object, handle frameworktypes.Handle,
|
||||
) (frameworktypes.Plugin, error) {
|
||||
args, ok := genericArgs.(*HighNodeUtilizationArgs)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("want args to be of type HighNodeUtilizationArgs, got %T", args)
|
||||
return nil, fmt.Errorf(
|
||||
"want args to be of type HighNodeUtilizationArgs, got %T",
|
||||
genericArgs,
|
||||
)
|
||||
}
|
||||
|
||||
targetThresholds := make(api.ResourceThresholds)
|
||||
setDefaultForThresholds(highNodeUtilizatioArgs.Thresholds, targetThresholds)
|
||||
resourceNames := getResourceNames(highNodeUtilizatioArgs.Thresholds)
|
||||
|
||||
underutilizationCriteria := []interface{}{
|
||||
"CPU", highNodeUtilizatioArgs.Thresholds[v1.ResourceCPU],
|
||||
"Mem", highNodeUtilizatioArgs.Thresholds[v1.ResourceMemory],
|
||||
"Pods", highNodeUtilizatioArgs.Thresholds[v1.ResourcePods],
|
||||
}
|
||||
for name := range highNodeUtilizatioArgs.Thresholds {
|
||||
if !nodeutil.IsBasicResource(name) {
|
||||
underutilizationCriteria = append(underutilizationCriteria, string(name), int64(highNodeUtilizatioArgs.Thresholds[name]))
|
||||
}
|
||||
// this plugins worries only about thresholds but the nodeplugins
|
||||
// package was made to take two thresholds into account, one for low
|
||||
// and another for high usage. here we make sure we set the high
|
||||
// threshold to the maximum value for all resources for which we have a
|
||||
// threshold.
|
||||
highThresholds := make(api.ResourceThresholds)
|
||||
for rname := range args.Thresholds {
|
||||
highThresholds[rname] = MaxResourcePercentage
|
||||
}
|
||||
|
||||
podFilter, err := podutil.NewOptions().
|
||||
// criteria is a list of thresholds that are used to determine if a node
|
||||
// is underutilized. it is used only for logging purposes.
|
||||
criteria := []any{}
|
||||
for rname, rvalue := range args.Thresholds {
|
||||
criteria = append(criteria, rname, rvalue)
|
||||
}
|
||||
|
||||
podFilter, err := podutil.
|
||||
NewOptions().
|
||||
WithFilter(handle.Evictor().Filter).
|
||||
BuildFilterFunc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error initializing pod filter function: %v", err)
|
||||
}
|
||||
|
||||
extendedResourceNames := uniquifyResourceNames(
|
||||
append(resourceNames, v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods),
|
||||
// resourceNames is a list of all resource names this plugin cares
|
||||
// about. we care about the resources for which we have a threshold and
|
||||
// all we consider the basic resources (cpu, memory, pods).
|
||||
resourceNames := uniquifyResourceNames(
|
||||
append(
|
||||
getResourceNames(args.Thresholds),
|
||||
v1.ResourceCPU,
|
||||
v1.ResourceMemory,
|
||||
v1.ResourcePods,
|
||||
),
|
||||
)
|
||||
|
||||
return &HighNodeUtilization{
|
||||
handle: handle,
|
||||
args: highNodeUtilizatioArgs,
|
||||
resourceNames: resourceNames,
|
||||
extendedResourceNames: extendedResourceNames,
|
||||
targetThresholds: targetThresholds,
|
||||
underutilizationCriteria: underutilizationCriteria,
|
||||
podFilter: podFilter,
|
||||
usageClient: newRequestedUsageClient(extendedResourceNames, handle.GetPodsAssignedToNodeFunc()),
|
||||
handle: handle,
|
||||
args: args,
|
||||
resourceNames: resourceNames,
|
||||
highThresholds: highThresholds,
|
||||
criteria: criteria,
|
||||
podFilter: podFilter,
|
||||
usageClient: newRequestedUsageClient(
|
||||
resourceNames,
|
||||
handle.GetPodsAssignedToNodeFunc(),
|
||||
),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Name retrieves the plugin name
|
||||
// Name retrieves the plugin name.
|
||||
func (h *HighNodeUtilization) Name() string {
|
||||
return HighNodeUtilizationPluginName
|
||||
}
|
||||
|
||||
// Balance extension point implementation for the plugin
|
||||
// Balance holds the main logic of the plugin. It evicts pods from under
|
||||
// utilized nodes. The goal here is to concentrate pods in fewer nodes so that
|
||||
// less nodes are used.
|
||||
func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
|
||||
if err := h.usageClient.sync(ctx, nodes); err != nil {
|
||||
return &frameworktypes.Status{
|
||||
@@ -108,28 +129,35 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
|
||||
}
|
||||
}
|
||||
|
||||
// take a picture of the current state of the nodes, everything else
|
||||
// here is based on this snapshot.
|
||||
nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, h.usageClient)
|
||||
capacities := referencedResourceListForNodesCapacity(nodes)
|
||||
|
||||
// node usages are not presented as percentages over the capacity.
|
||||
// we need to normalize them to be able to compare them with the
|
||||
// thresholds. thresholds are already provided by the user in
|
||||
// percentage.
|
||||
usage, thresholds := assessNodesUsagesAndStaticThresholds(
|
||||
nodesUsageMap,
|
||||
capacities,
|
||||
h.args.Thresholds,
|
||||
h.targetThresholds,
|
||||
nodesUsageMap, capacities, h.args.Thresholds, h.highThresholds,
|
||||
)
|
||||
|
||||
// classify nodes in two groups: underutilized and schedulable. we will
|
||||
// later try to move pods from the first group to the second.
|
||||
nodeGroups := classifyNodeUsage(
|
||||
usage,
|
||||
thresholds,
|
||||
usage, thresholds,
|
||||
[]classifierFnc{
|
||||
// underutilized nodes
|
||||
// underutilized nodes.
|
||||
func(nodeName string, usage, threshold api.ResourceThresholds) bool {
|
||||
return isNodeBelowThreshold(usage, threshold)
|
||||
},
|
||||
// every other node that is schedulable
|
||||
// schedulable nodes.
|
||||
func(nodeName string, usage, threshold api.ResourceThresholds) bool {
|
||||
if nodeutil.IsNodeUnschedulable(nodesMap[nodeName]) {
|
||||
klog.V(2).InfoS("Node is unschedulable", "node", klog.KObj(nodesMap[nodeName]))
|
||||
klog.V(2).InfoS(
|
||||
"Node is unschedulable",
|
||||
"node", klog.KObj(nodesMap[nodeName]),
|
||||
)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -137,13 +165,18 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
|
||||
},
|
||||
)
|
||||
|
||||
// convert groups node []NodeInfo
|
||||
// the nodeplugin package works by means of NodeInfo structures. these
|
||||
// structures hold a series of information about the nodes. now that
|
||||
// we have classified the nodes, we can build the NodeInfo structures
|
||||
// for each group. NodeInfo structs carry usage and available resources
|
||||
// for each node.
|
||||
nodeInfos := make([][]NodeInfo, 2)
|
||||
category := []string{"underutilized", "overutilized"}
|
||||
for i := range nodeGroups {
|
||||
for nodeName := range nodeGroups[i] {
|
||||
klog.InfoS(
|
||||
fmt.Sprintf("Node is %s", category[i]),
|
||||
"Node has been classified",
|
||||
"category", category[i],
|
||||
"node", klog.KObj(nodesMap[nodeName]),
|
||||
"usage", nodesUsageMap[nodeName],
|
||||
"usagePercentage", normalizer.Round(usage[nodeName]),
|
||||
@@ -151,64 +184,73 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
|
||||
nodeInfos[i] = append(nodeInfos[i], NodeInfo{
|
||||
NodeUsage: NodeUsage{
|
||||
node: nodesMap[nodeName],
|
||||
usage: nodesUsageMap[nodeName], // get back the original node usage
|
||||
usage: nodesUsageMap[nodeName],
|
||||
allPods: podListMap[nodeName],
|
||||
},
|
||||
thresholds: NodeThresholds{
|
||||
lowResourceThreshold: resourceThresholdsToNodeUsage(thresholds[nodeName][0], capacities[nodeName], h.extendedResourceNames),
|
||||
highResourceThreshold: resourceThresholdsToNodeUsage(thresholds[nodeName][1], capacities[nodeName], h.extendedResourceNames),
|
||||
},
|
||||
available: capNodeCapacitiesToThreshold(
|
||||
nodesMap[nodeName],
|
||||
thresholds[nodeName][1],
|
||||
h.resourceNames,
|
||||
),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
sourceNodes := nodeInfos[0]
|
||||
highNodes := nodeInfos[1]
|
||||
lowNodes, schedulableNodes := nodeInfos[0], nodeInfos[1]
|
||||
|
||||
// log message in one line
|
||||
klog.V(1).InfoS("Criteria for a node below target utilization", h.underutilizationCriteria...)
|
||||
klog.V(1).InfoS("Number of underutilized nodes", "totalNumber", len(sourceNodes))
|
||||
klog.V(1).InfoS("Criteria for a node below target utilization", h.criteria...)
|
||||
klog.V(1).InfoS("Number of underutilized nodes", "totalNumber", len(lowNodes))
|
||||
|
||||
if len(sourceNodes) == 0 {
|
||||
klog.V(1).InfoS("No node is underutilized, nothing to do here, you might tune your thresholds further")
|
||||
if len(lowNodes) == 0 {
|
||||
klog.V(1).InfoS(
|
||||
"No node is underutilized, nothing to do here, you might tune your thresholds further",
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if len(sourceNodes) <= h.args.NumberOfNodes {
|
||||
klog.V(1).InfoS("Number of nodes underutilized is less or equal than NumberOfNodes, nothing to do here", "underutilizedNodes", len(sourceNodes), "numberOfNodes", h.args.NumberOfNodes)
|
||||
|
||||
if len(lowNodes) <= h.args.NumberOfNodes {
|
||||
klog.V(1).InfoS(
|
||||
"Number of nodes underutilized is less or equal than NumberOfNodes, nothing to do here",
|
||||
"underutilizedNodes", len(lowNodes),
|
||||
"numberOfNodes", h.args.NumberOfNodes,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if len(sourceNodes) == len(nodes) {
|
||||
|
||||
if len(lowNodes) == len(nodes) {
|
||||
klog.V(1).InfoS("All nodes are underutilized, nothing to do here")
|
||||
return nil
|
||||
}
|
||||
if len(highNodes) == 0 {
|
||||
|
||||
if len(schedulableNodes) == 0 {
|
||||
klog.V(1).InfoS("No node is available to schedule the pods, nothing to do here")
|
||||
return nil
|
||||
}
|
||||
|
||||
// stop if the total available usage has dropped to zero - no more pods can be scheduled
|
||||
continueEvictionCond := func(nodeInfo NodeInfo, totalAvailableUsage api.ReferencedResourceList) bool {
|
||||
for name := range totalAvailableUsage {
|
||||
if totalAvailableUsage[name].CmpInt64(0) < 1 {
|
||||
// stops the eviction process if the total available capacity sage has
|
||||
// dropped to zero - no more pods can be scheduled. this will signalize
|
||||
// to stop if any of the available resources has dropped to zero.
|
||||
continueEvictionCond := func(_ NodeInfo, avail api.ReferencedResourceList) bool {
|
||||
for name := range avail {
|
||||
if avail[name].CmpInt64(0) < 1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Sort the nodes by the usage in ascending order
|
||||
sortNodesByUsage(sourceNodes, true)
|
||||
// sorts the nodes by the usage in ascending order.
|
||||
sortNodesByUsage(lowNodes, true)
|
||||
|
||||
evictPodsFromSourceNodes(
|
||||
ctx,
|
||||
h.args.EvictableNamespaces,
|
||||
sourceNodes,
|
||||
highNodes,
|
||||
lowNodes,
|
||||
schedulableNodes,
|
||||
h.handle.Evictor(),
|
||||
evictions.EvictOptions{StrategyName: HighNodeUtilizationPluginName},
|
||||
h.podFilter,
|
||||
h.extendedResourceNames,
|
||||
h.resourceNames,
|
||||
continueEvictionCond,
|
||||
h.usageClient,
|
||||
nil,
|
||||
@@ -216,21 +258,3 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setDefaultForThresholds(thresholds, targetThresholds api.ResourceThresholds) {
|
||||
if _, ok := thresholds[v1.ResourcePods]; ok {
|
||||
targetThresholds[v1.ResourcePods] = MaxResourcePercentage
|
||||
}
|
||||
if _, ok := thresholds[v1.ResourceCPU]; ok {
|
||||
targetThresholds[v1.ResourceCPU] = MaxResourcePercentage
|
||||
}
|
||||
if _, ok := thresholds[v1.ResourceMemory]; ok {
|
||||
targetThresholds[v1.ResourceMemory] = MaxResourcePercentage
|
||||
}
|
||||
|
||||
for name := range thresholds {
|
||||
if !nodeutil.IsBasicResource(name) {
|
||||
targetThresholds[name] = MaxResourcePercentage
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,120 +34,115 @@ import (
|
||||
|
||||
const LowNodeUtilizationPluginName = "LowNodeUtilization"
|
||||
|
||||
// LowNodeUtilization evicts pods from overutilized nodes to underutilized nodes. Note that CPU/Memory requests are used
|
||||
// to calculate nodes' utilization and not the actual resource usage.
|
||||
|
||||
type LowNodeUtilization struct {
|
||||
handle frameworktypes.Handle
|
||||
args *LowNodeUtilizationArgs
|
||||
podFilter func(pod *v1.Pod) bool
|
||||
underutilizationCriteria []interface{}
|
||||
overutilizationCriteria []interface{}
|
||||
resourceNames []v1.ResourceName
|
||||
extendedResourceNames []v1.ResourceName
|
||||
usageClient usageClient
|
||||
}
|
||||
|
||||
// this lines makes sure that HighNodeUtilization implements the BalancePlugin
|
||||
// interface.
|
||||
var _ frameworktypes.BalancePlugin = &LowNodeUtilization{}
|
||||
|
||||
// NewLowNodeUtilization builds plugin from its arguments while passing a handle
|
||||
func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
|
||||
lowNodeUtilizationArgsArgs, ok := args.(*LowNodeUtilizationArgs)
|
||||
// LowNodeUtilization evicts pods from overutilized nodes to underutilized
|
||||
// nodes. Note that CPU/Memory requests are used to calculate nodes'
|
||||
// utilization and not the actual resource usage.
|
||||
type LowNodeUtilization struct {
|
||||
handle frameworktypes.Handle
|
||||
args *LowNodeUtilizationArgs
|
||||
podFilter func(pod *v1.Pod) bool
|
||||
underCriteria []any
|
||||
overCriteria []any
|
||||
resourceNames []v1.ResourceName
|
||||
extendedResourceNames []v1.ResourceName
|
||||
usageClient usageClient
|
||||
}
|
||||
|
||||
// NewLowNodeUtilization builds plugin from its arguments while passing a
|
||||
// handle. this plugin aims to move workload from overutilized nodes to
|
||||
// underutilized nodes.
|
||||
func NewLowNodeUtilization(
|
||||
genericArgs runtime.Object, handle frameworktypes.Handle,
|
||||
) (frameworktypes.Plugin, error) {
|
||||
args, ok := genericArgs.(*LowNodeUtilizationArgs)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("want args to be of type LowNodeUtilizationArgs, got %T", args)
|
||||
return nil, fmt.Errorf(
|
||||
"want args to be of type LowNodeUtilizationArgs, got %T",
|
||||
genericArgs,
|
||||
)
|
||||
}
|
||||
|
||||
resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)
|
||||
// resourceNames holds a list of resources for which the user has
|
||||
// provided thresholds for. extendedResourceNames holds those as well
|
||||
// as cpu, memory and pods if no prometheus collection is used.
|
||||
resourceNames := getResourceNames(args.Thresholds)
|
||||
extendedResourceNames := resourceNames
|
||||
|
||||
metricsUtilization := lowNodeUtilizationArgsArgs.MetricsUtilization
|
||||
if metricsUtilization != nil && metricsUtilization.Source == api.PrometheusMetrics {
|
||||
if metricsUtilization.Prometheus != nil && metricsUtilization.Prometheus.Query != "" {
|
||||
uResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)
|
||||
oResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.TargetThresholds)
|
||||
if len(uResourceNames) != 1 || uResourceNames[0] != MetricResource {
|
||||
return nil, fmt.Errorf("thresholds are expected to specify a single instance of %q resource, got %v instead", MetricResource, uResourceNames)
|
||||
}
|
||||
if len(oResourceNames) != 1 || oResourceNames[0] != MetricResource {
|
||||
return nil, fmt.Errorf("targetThresholds are expected to specify a single instance of %q resource, got %v instead", MetricResource, oResourceNames)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("prometheus query is missing")
|
||||
// if we are using prometheus we need to validate we have everything we
|
||||
// need. if we aren't then we need to make sure we are also collecting
|
||||
// data for cpu, memory and pods.
|
||||
metrics := args.MetricsUtilization
|
||||
if metrics != nil && metrics.Source == api.PrometheusMetrics {
|
||||
if err := validatePrometheusMetricsUtilization(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
extendedResourceNames = uniquifyResourceNames(append(resourceNames, v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods))
|
||||
extendedResourceNames = uniquifyResourceNames(
|
||||
append(
|
||||
resourceNames,
|
||||
v1.ResourceCPU,
|
||||
v1.ResourceMemory,
|
||||
v1.ResourcePods,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
underutilizationCriteria := []interface{}{
|
||||
"CPU", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourceCPU],
|
||||
"Mem", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourceMemory],
|
||||
"Pods", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourcePods],
|
||||
// underCriteria and overCriteria are slices used for logging purposes.
|
||||
// we assemble them only once.
|
||||
underCriteria, overCriteria := []any{}, []any{}
|
||||
for name := range args.Thresholds {
|
||||
underCriteria = append(underCriteria, name, args.Thresholds[name])
|
||||
}
|
||||
for name := range lowNodeUtilizationArgsArgs.Thresholds {
|
||||
if !nodeutil.IsBasicResource(name) {
|
||||
underutilizationCriteria = append(underutilizationCriteria, string(name), int64(lowNodeUtilizationArgsArgs.Thresholds[name]))
|
||||
}
|
||||
for name := range args.TargetThresholds {
|
||||
overCriteria = append(overCriteria, name, args.TargetThresholds[name])
|
||||
}
|
||||
|
||||
overutilizationCriteria := []interface{}{
|
||||
"CPU", lowNodeUtilizationArgsArgs.TargetThresholds[v1.ResourceCPU],
|
||||
"Mem", lowNodeUtilizationArgsArgs.TargetThresholds[v1.ResourceMemory],
|
||||
"Pods", lowNodeUtilizationArgsArgs.TargetThresholds[v1.ResourcePods],
|
||||
}
|
||||
for name := range lowNodeUtilizationArgsArgs.TargetThresholds {
|
||||
if !nodeutil.IsBasicResource(name) {
|
||||
overutilizationCriteria = append(overutilizationCriteria, string(name), int64(lowNodeUtilizationArgsArgs.TargetThresholds[name]))
|
||||
}
|
||||
}
|
||||
|
||||
podFilter, err := podutil.NewOptions().
|
||||
podFilter, err := podutil.
|
||||
NewOptions().
|
||||
WithFilter(handle.Evictor().Filter).
|
||||
BuildFilterFunc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error initializing pod filter function: %v", err)
|
||||
}
|
||||
|
||||
var usageClient usageClient
|
||||
// MetricsServer is deprecated, removed once dropped
|
||||
if metricsUtilization != nil {
|
||||
switch {
|
||||
case metricsUtilization.MetricsServer, metricsUtilization.Source == api.KubernetesMetrics:
|
||||
if handle.MetricsCollector() == nil {
|
||||
return nil, fmt.Errorf("metrics client not initialized")
|
||||
}
|
||||
usageClient = newActualUsageClient(extendedResourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
|
||||
case metricsUtilization.Source == api.PrometheusMetrics:
|
||||
if handle.PrometheusClient() == nil {
|
||||
return nil, fmt.Errorf("prometheus client not initialized")
|
||||
}
|
||||
usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), handle.PrometheusClient(), metricsUtilization.Prometheus.Query)
|
||||
case metricsUtilization.Source != "":
|
||||
return nil, fmt.Errorf("unrecognized metrics source")
|
||||
default:
|
||||
return nil, fmt.Errorf("metrics source is empty")
|
||||
// this plugins supports different ways of collecting usage data. each
|
||||
// different way provides its own "usageClient". here we make sure we
|
||||
// have the correct one or an error is triggered. XXX MetricsServer is
|
||||
// deprecated, removed once dropped.
|
||||
var usageClient usageClient = newRequestedUsageClient(
|
||||
extendedResourceNames, handle.GetPodsAssignedToNodeFunc(),
|
||||
)
|
||||
if metrics != nil {
|
||||
usageClient, err = usageClientForMetrics(args, handle, extendedResourceNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
usageClient = newRequestedUsageClient(extendedResourceNames, handle.GetPodsAssignedToNodeFunc())
|
||||
}
|
||||
|
||||
return &LowNodeUtilization{
|
||||
handle: handle,
|
||||
args: lowNodeUtilizationArgsArgs,
|
||||
underutilizationCriteria: underutilizationCriteria,
|
||||
overutilizationCriteria: overutilizationCriteria,
|
||||
resourceNames: resourceNames,
|
||||
extendedResourceNames: extendedResourceNames,
|
||||
podFilter: podFilter,
|
||||
usageClient: usageClient,
|
||||
handle: handle,
|
||||
args: args,
|
||||
underCriteria: underCriteria,
|
||||
overCriteria: overCriteria,
|
||||
resourceNames: resourceNames,
|
||||
extendedResourceNames: extendedResourceNames,
|
||||
podFilter: podFilter,
|
||||
usageClient: usageClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Name retrieves the plugin name
|
||||
// Name retrieves the plugin name.
|
||||
func (l *LowNodeUtilization) Name() string {
|
||||
return LowNodeUtilizationPluginName
|
||||
}
|
||||
|
||||
// Balance extension point implementation for the plugin
|
||||
// Balance holds the main logic of the plugin. It evicts pods from over
|
||||
// utilized nodes to under utilized nodes. The goal here is to evenly
|
||||
// distribute pods across nodes.
|
||||
func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
|
||||
if err := l.usageClient.sync(ctx, nodes); err != nil {
|
||||
return &frameworktypes.Status{
|
||||
@@ -155,75 +150,100 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
|
||||
}
|
||||
}
|
||||
|
||||
// starts by taking a snapshot ofthe nodes usage. we will use this
|
||||
// snapshot to assess the nodes usage and classify them as
|
||||
// underutilized or overutilized.
|
||||
nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, l.usageClient)
|
||||
capacities := referencedResourceListForNodesCapacity(nodes)
|
||||
|
||||
// usage, by default, is exposed in absolute values. we need to normalize
|
||||
// them (convert them to percentages) to be able to compare them with the
|
||||
// user provided thresholds. thresholds are already provided in percentage
|
||||
// in the <0; 100> interval.
|
||||
var usage map[string]api.ResourceThresholds
|
||||
var thresholds map[string][]api.ResourceThresholds
|
||||
if l.args.UseDeviationThresholds {
|
||||
// here the thresholds provided by the user represent
|
||||
// deviations from the average so we need to treat them
|
||||
// differently. when calculating the average we only
|
||||
// need to consider the resources for which the user
|
||||
// has provided thresholds.
|
||||
usage, thresholds = assessNodesUsagesAndRelativeThresholds(
|
||||
filterResourceNamesFromNodeUsage(nodesUsageMap, l.resourceNames),
|
||||
filterResourceNames(nodesUsageMap, l.resourceNames),
|
||||
capacities,
|
||||
l.args.Thresholds,
|
||||
l.args.TargetThresholds,
|
||||
)
|
||||
} else {
|
||||
usage, thresholds = assessNodesUsagesAndStaticThresholds(
|
||||
filterResourceNamesFromNodeUsage(nodesUsageMap, l.resourceNames),
|
||||
nodesUsageMap,
|
||||
capacities,
|
||||
l.args.Thresholds,
|
||||
l.args.TargetThresholds,
|
||||
)
|
||||
}
|
||||
|
||||
// classify nodes in under and over utilized. we will later try to move
|
||||
// pods from the overutilized nodes to the underutilized ones.
|
||||
nodeGroups := classifyNodeUsage(
|
||||
usage,
|
||||
thresholds,
|
||||
usage, thresholds,
|
||||
[]classifierFnc{
|
||||
// underutilization
|
||||
// underutilization criteria processing. nodes that are
|
||||
// underutilized but aren't schedulable are ignored.
|
||||
func(nodeName string, usage, threshold api.ResourceThresholds) bool {
|
||||
if nodeutil.IsNodeUnschedulable(nodesMap[nodeName]) {
|
||||
klog.V(2).InfoS("Node is unschedulable, thus not considered as underutilized", "node", klog.KObj(nodesMap[nodeName]))
|
||||
klog.V(2).InfoS(
|
||||
"Node is unschedulable, thus not considered as underutilized",
|
||||
"node", klog.KObj(nodesMap[nodeName]),
|
||||
)
|
||||
return false
|
||||
}
|
||||
return isNodeBelowThreshold(usage, threshold)
|
||||
},
|
||||
// overutilization
|
||||
// overutilization criteria evaluation.
|
||||
func(nodeName string, usage, threshold api.ResourceThresholds) bool {
|
||||
return isNodeAboveThreshold(usage, threshold)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// convert groups node []NodeInfo
|
||||
// the nodeutilization package was designed to work with NodeInfo
|
||||
// structs. these structs holds information about how utilized a node
|
||||
// is. we need to go through the result of the classification and turn
|
||||
// it into NodeInfo structs.
|
||||
nodeInfos := make([][]NodeInfo, 2)
|
||||
category := []string{"underutilized", "overutilized"}
|
||||
listedNodes := map[string]struct{}{}
|
||||
categories := []string{"underutilized", "overutilized"}
|
||||
classifiedNodes := map[string]bool{}
|
||||
for i := range nodeGroups {
|
||||
for nodeName := range nodeGroups[i] {
|
||||
classifiedNodes[nodeName] = true
|
||||
|
||||
klog.InfoS(
|
||||
fmt.Sprintf("Node is %s", category[i]),
|
||||
"Node has been classified",
|
||||
"category", categories[i],
|
||||
"node", klog.KObj(nodesMap[nodeName]),
|
||||
"usage", nodesUsageMap[nodeName],
|
||||
"usagePercentage", normalizer.Round(usage[nodeName]),
|
||||
)
|
||||
listedNodes[nodeName] = struct{}{}
|
||||
|
||||
nodeInfos[i] = append(nodeInfos[i], NodeInfo{
|
||||
NodeUsage: NodeUsage{
|
||||
node: nodesMap[nodeName],
|
||||
usage: nodesUsageMap[nodeName], // get back the original node usage
|
||||
usage: nodesUsageMap[nodeName],
|
||||
allPods: podListMap[nodeName],
|
||||
},
|
||||
thresholds: NodeThresholds{
|
||||
lowResourceThreshold: resourceThresholdsToNodeUsage(thresholds[nodeName][0], capacities[nodeName], append(l.extendedResourceNames, v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods)),
|
||||
highResourceThreshold: resourceThresholdsToNodeUsage(thresholds[nodeName][1], capacities[nodeName], append(l.extendedResourceNames, v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods)),
|
||||
},
|
||||
available: capNodeCapacitiesToThreshold(
|
||||
nodesMap[nodeName],
|
||||
thresholds[nodeName][1],
|
||||
l.extendedResourceNames,
|
||||
),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// log nodes that are appropriately utilized.
|
||||
for nodeName := range nodesMap {
|
||||
if _, ok := listedNodes[nodeName]; !ok {
|
||||
if !classifiedNodes[nodeName] {
|
||||
klog.InfoS(
|
||||
"Node is appropriately utilized",
|
||||
"node", klog.KObj(nodesMap[nodeName]),
|
||||
@@ -233,24 +253,27 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
|
||||
}
|
||||
}
|
||||
|
||||
lowNodes := nodeInfos[0]
|
||||
sourceNodes := nodeInfos[1]
|
||||
lowNodes, highNodes := nodeInfos[0], nodeInfos[1]
|
||||
|
||||
// log message for nodes with low utilization
|
||||
klog.V(1).InfoS("Criteria for a node under utilization", l.underutilizationCriteria...)
|
||||
// log messages for nodes with low and high utilization
|
||||
klog.V(1).InfoS("Criteria for a node under utilization", l.underCriteria...)
|
||||
klog.V(1).InfoS("Number of underutilized nodes", "totalNumber", len(lowNodes))
|
||||
|
||||
// log message for over utilized nodes
|
||||
klog.V(1).InfoS("Criteria for a node above target utilization", l.overutilizationCriteria...)
|
||||
klog.V(1).InfoS("Number of overutilized nodes", "totalNumber", len(sourceNodes))
|
||||
klog.V(1).InfoS("Criteria for a node above target utilization", l.overCriteria...)
|
||||
klog.V(1).InfoS("Number of overutilized nodes", "totalNumber", len(highNodes))
|
||||
|
||||
if len(lowNodes) == 0 {
|
||||
klog.V(1).InfoS("No node is underutilized, nothing to do here, you might tune your thresholds further")
|
||||
klog.V(1).InfoS(
|
||||
"No node is underutilized, nothing to do here, you might tune your thresholds further",
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(lowNodes) <= l.args.NumberOfNodes {
|
||||
klog.V(1).InfoS("Number of nodes underutilized is less or equal than NumberOfNodes, nothing to do here", "underutilizedNodes", len(lowNodes), "numberOfNodes", l.args.NumberOfNodes)
|
||||
klog.V(1).InfoS(
|
||||
"Number of nodes underutilized is less or equal than NumberOfNodes, nothing to do here",
|
||||
"underutilizedNodes", len(lowNodes),
|
||||
"numberOfNodes", l.args.NumberOfNodes,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -259,14 +282,15 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(sourceNodes) == 0 {
|
||||
if len(highNodes) == 0 {
|
||||
klog.V(1).InfoS("All nodes are under target utilization, nothing to do here")
|
||||
return nil
|
||||
}
|
||||
|
||||
// stop if node utilization drops below target threshold or any of required capacity (cpu, memory, pods) is moved
|
||||
// this is a stop condition for the eviction process. we stop as soon
|
||||
// as the node usage drops below the threshold.
|
||||
continueEvictionCond := func(nodeInfo NodeInfo, totalAvailableUsage api.ReferencedResourceList) bool {
|
||||
if !isNodeAboveTargetUtilization(nodeInfo.NodeUsage, nodeInfo.thresholds.highResourceThreshold) {
|
||||
if !isNodeAboveTargetUtilization(nodeInfo.NodeUsage, nodeInfo.available) {
|
||||
return false
|
||||
}
|
||||
for name := range totalAvailableUsage {
|
||||
@@ -278,8 +302,8 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
|
||||
return true
|
||||
}
|
||||
|
||||
// Sort the nodes by the usage in descending order
|
||||
sortNodesByUsage(sourceNodes, false)
|
||||
// sort the nodes by the usage in descending order
|
||||
sortNodesByUsage(highNodes, false)
|
||||
|
||||
var nodeLimit *uint
|
||||
if l.args.EvictionLimits != nil {
|
||||
@@ -289,7 +313,7 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
|
||||
evictPodsFromSourceNodes(
|
||||
ctx,
|
||||
l.args.EvictableNamespaces,
|
||||
sourceNodes,
|
||||
highNodes,
|
||||
lowNodes,
|
||||
l.handle.Evictor(),
|
||||
evictions.EvictOptions{StrategyName: LowNodeUtilizationPluginName},
|
||||
@@ -302,3 +326,66 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validatePrometheusMetricsUtilization validates the Prometheus metrics
|
||||
// utilization. XXX this should be done way earlier than this.
|
||||
func validatePrometheusMetricsUtilization(args *LowNodeUtilizationArgs) error {
|
||||
if args.MetricsUtilization.Prometheus == nil {
|
||||
return fmt.Errorf("prometheus property is missing")
|
||||
}
|
||||
|
||||
if args.MetricsUtilization.Prometheus.Query == "" {
|
||||
return fmt.Errorf("prometheus query is missing")
|
||||
}
|
||||
|
||||
uResourceNames := getResourceNames(args.Thresholds)
|
||||
oResourceNames := getResourceNames(args.TargetThresholds)
|
||||
if len(uResourceNames) != 1 || uResourceNames[0] != MetricResource {
|
||||
return fmt.Errorf(
|
||||
"thresholds are expected to specify a single instance of %q resource, got %v instead",
|
||||
MetricResource, uResourceNames,
|
||||
)
|
||||
}
|
||||
|
||||
if len(oResourceNames) != 1 || oResourceNames[0] != MetricResource {
|
||||
return fmt.Errorf(
|
||||
"targetThresholds are expected to specify a single instance of %q resource, got %v instead",
|
||||
MetricResource, oResourceNames,
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// usageClientForMetrics returns the correct usage client based on the
|
||||
// metrics source. XXX MetricsServer is deprecated, removed once dropped.
|
||||
func usageClientForMetrics(
|
||||
args *LowNodeUtilizationArgs, handle frameworktypes.Handle, resources []v1.ResourceName,
|
||||
) (usageClient, error) {
|
||||
metrics := args.MetricsUtilization
|
||||
switch {
|
||||
case metrics.MetricsServer, metrics.Source == api.KubernetesMetrics:
|
||||
if handle.MetricsCollector() == nil {
|
||||
return nil, fmt.Errorf("metrics client not initialized")
|
||||
}
|
||||
return newActualUsageClient(
|
||||
resources,
|
||||
handle.GetPodsAssignedToNodeFunc(),
|
||||
handle.MetricsCollector(),
|
||||
), nil
|
||||
|
||||
case metrics.Source == api.PrometheusMetrics:
|
||||
if handle.PrometheusClient() == nil {
|
||||
return nil, fmt.Errorf("prometheus client not initialized")
|
||||
}
|
||||
return newPrometheusUsageClient(
|
||||
handle.GetPodsAssignedToNodeFunc(),
|
||||
handle.PrometheusClient(),
|
||||
metrics.Prometheus.Query,
|
||||
), nil
|
||||
case metrics.Source != "":
|
||||
return nil, fmt.Errorf("unrecognized metrics source")
|
||||
default:
|
||||
return nil, fmt.Errorf("metrics source is empty")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ package nodeutilization
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"maps"
|
||||
"slices"
|
||||
"sort"
|
||||
@@ -59,36 +60,41 @@ import (
|
||||
// - thresholds: map[string][]api.ReferencedResourceList
|
||||
// - pod list: map[string][]*v1.Pod
|
||||
// Once the nodes are classified produce the original []NodeInfo so the code is not that much changed (postponing further refactoring once it is needed)
|
||||
const MetricResource = v1.ResourceName("MetricResource")
|
||||
|
||||
// NodeUsage stores a node's info, pods on it, thresholds and its resource usage
|
||||
type NodeUsage struct {
|
||||
node *v1.Node
|
||||
usage api.ReferencedResourceList
|
||||
allPods []*v1.Pod
|
||||
}
|
||||
|
||||
type NodeThresholds struct {
|
||||
lowResourceThreshold api.ReferencedResourceList
|
||||
highResourceThreshold api.ReferencedResourceList
|
||||
}
|
||||
|
||||
type NodeInfo struct {
|
||||
NodeUsage
|
||||
thresholds NodeThresholds
|
||||
}
|
||||
|
||||
type continueEvictionCond func(nodeInfo NodeInfo, totalAvailableUsage api.ReferencedResourceList) bool
|
||||
|
||||
const (
|
||||
// MetricResource is a special resource name we use to keep track of a
|
||||
// metric obtained from a third party entity.
|
||||
MetricResource = v1.ResourceName("MetricResource")
|
||||
// MinResourcePercentage is the minimum value of a resource's percentage
|
||||
MinResourcePercentage = 0
|
||||
// MaxResourcePercentage is the maximum value of a resource's percentage
|
||||
MaxResourcePercentage = 100
|
||||
)
|
||||
|
||||
// getNodeUsageSnapshot separates the snapshot into easily accesible
|
||||
// data chunks so the node usage can be processed separately.
|
||||
// NodeUsage stores a node's info, pods on it, thresholds and its resource
|
||||
// usage.
|
||||
type NodeUsage struct {
|
||||
node *v1.Node
|
||||
usage api.ReferencedResourceList
|
||||
allPods []*v1.Pod
|
||||
}
|
||||
|
||||
// NodeInfo is an entity we use to gather information about a given node. here
|
||||
// we have its resource usage as well as the amount of available resources.
|
||||
// we use this struct to carry information around and to make it easier to
|
||||
// process.
|
||||
type NodeInfo struct {
|
||||
NodeUsage
|
||||
available api.ReferencedResourceList
|
||||
}
|
||||
|
||||
// continueEvictionCont is a function that determines if we should keep
|
||||
// evicting pods or not.
|
||||
type continueEvictionCond func(NodeInfo, api.ReferencedResourceList) bool
|
||||
|
||||
// getNodeUsageSnapshot separates the snapshot into easily accesible data
|
||||
// chunks so the node usage can be processed separately. returns a map of
|
||||
// nodes, a map of their usage and a map of their pods. maps are indexed
|
||||
// by node name.
|
||||
func getNodeUsageSnapshot(
|
||||
nodes []*v1.Node,
|
||||
usageClient usageClient,
|
||||
@@ -97,10 +103,11 @@ func getNodeUsageSnapshot(
|
||||
map[string]api.ReferencedResourceList,
|
||||
map[string][]*v1.Pod,
|
||||
) {
|
||||
nodesMap := make(map[string]*v1.Node)
|
||||
// node usage needs to be kept in the original resource quantity since converting to percentages and back is losing precision
|
||||
// XXX node usage needs to be kept in the original resource quantity
|
||||
// since converting to percentages and back is losing precision.
|
||||
nodesUsageMap := make(map[string]api.ReferencedResourceList)
|
||||
podListMap := make(map[string][]*v1.Pod)
|
||||
nodesMap := make(map[string]*v1.Node)
|
||||
|
||||
for _, node := range nodes {
|
||||
nodesMap[node.Name] = node
|
||||
@@ -111,45 +118,13 @@ func getNodeUsageSnapshot(
|
||||
return nodesMap, nodesUsageMap, podListMap
|
||||
}
|
||||
|
||||
func resourceThreshold(nodeCapacity api.ReferencedResourceList, resourceName v1.ResourceName, threshold api.Percentage) *resource.Quantity {
|
||||
defaultFormat := resource.DecimalSI
|
||||
if resourceName == v1.ResourceMemory {
|
||||
defaultFormat = resource.BinarySI
|
||||
}
|
||||
|
||||
resourceCapacityFraction := func(resourceNodeCapacity int64) int64 {
|
||||
// A threshold is in percentages but in <0;100> interval.
|
||||
// Performing `threshold * 0.01` will convert <0;100> interval into <0;1>.
|
||||
// Multiplying it with capacity will give fraction of the capacity corresponding to the given resource threshold in Quantity units.
|
||||
return int64(float64(threshold) * 0.01 * float64(resourceNodeCapacity))
|
||||
}
|
||||
|
||||
resourceCapacityQuantity := &resource.Quantity{Format: defaultFormat}
|
||||
if _, ok := nodeCapacity[resourceName]; ok {
|
||||
resourceCapacityQuantity = nodeCapacity[resourceName]
|
||||
}
|
||||
|
||||
if resourceName == v1.ResourceCPU {
|
||||
return resource.NewMilliQuantity(resourceCapacityFraction(resourceCapacityQuantity.MilliValue()), defaultFormat)
|
||||
}
|
||||
return resource.NewQuantity(resourceCapacityFraction(resourceCapacityQuantity.Value()), defaultFormat)
|
||||
}
|
||||
|
||||
func resourceThresholdsToNodeUsage(resourceThresholds api.ResourceThresholds, capacity api.ReferencedResourceList, resourceNames []v1.ResourceName) api.ReferencedResourceList {
|
||||
nodeUsage := make(api.ReferencedResourceList)
|
||||
for resourceName, threshold := range resourceThresholds {
|
||||
nodeUsage[resourceName] = resourceThreshold(capacity, resourceName, threshold)
|
||||
}
|
||||
for _, resourceName := range resourceNames {
|
||||
if _, exists := nodeUsage[resourceName]; !exists {
|
||||
nodeUsage[resourceName] = capacity[resourceName]
|
||||
}
|
||||
}
|
||||
return nodeUsage
|
||||
}
|
||||
|
||||
type classifierFnc func(nodeName string, value, threshold api.ResourceThresholds) bool
|
||||
// classifierFnc is a function that classifies a node based on its usage and
|
||||
// thresholds. returns true if it belongs to the group the classifier
|
||||
// represents.
|
||||
type classifierFnc func(string, api.ResourceThresholds, api.ResourceThresholds) bool
|
||||
|
||||
// classifyNodeUsage classify nodes into different groups based on classifiers.
|
||||
// returns one group for each classifier.
|
||||
func classifyNodeUsage(
|
||||
nodeUsageAsNodeThresholds map[string]api.ResourceThresholds,
|
||||
nodeThresholdsMap map[string][]api.ResourceThresholds,
|
||||
@@ -172,9 +147,10 @@ func classifyNodeUsage(
|
||||
return nodeGroups
|
||||
}
|
||||
|
||||
func usageToKeysAndValues(usage api.ReferencedResourceList) []interface{} {
|
||||
// log message in one line
|
||||
keysAndValues := []interface{}{}
|
||||
// usageToKeysAndValues converts a ReferencedResourceList into a list of
|
||||
// keys and values. this is useful for logging.
|
||||
func usageToKeysAndValues(usage api.ReferencedResourceList) []any {
|
||||
keysAndValues := []any{}
|
||||
if quantity, exists := usage[v1.ResourceCPU]; exists {
|
||||
keysAndValues = append(keysAndValues, "CPU", quantity.MilliValue())
|
||||
}
|
||||
@@ -186,15 +162,14 @@ func usageToKeysAndValues(usage api.ReferencedResourceList) []interface{} {
|
||||
}
|
||||
for name := range usage {
|
||||
if !nodeutil.IsBasicResource(name) {
|
||||
keysAndValues = append(keysAndValues, string(name), usage[name].Value())
|
||||
keysAndValues = append(keysAndValues, name, usage[name].Value())
|
||||
}
|
||||
}
|
||||
return keysAndValues
|
||||
}
|
||||
|
||||
// evictPodsFromSourceNodes evicts pods based on priority, if all the pods on the node have priority, if not
|
||||
// evicts them based on QoS as fallback option.
|
||||
// TODO: @ravig Break this function into smaller functions.
|
||||
// evictPodsFromSourceNodes evicts pods based on priority, if all the pods on
|
||||
// the node have priority, if not evicts them based on QoS as fallback option.
|
||||
func evictPodsFromSourceNodes(
|
||||
ctx context.Context,
|
||||
evictableNamespaces *api.Namespaces,
|
||||
@@ -207,48 +182,65 @@ func evictPodsFromSourceNodes(
|
||||
usageClient usageClient,
|
||||
maxNoOfPodsToEvictPerNode *uint,
|
||||
) {
|
||||
// upper bound on total number of pods/cpu/memory and optional extended resources to be moved
|
||||
totalAvailableUsage := api.ReferencedResourceList{}
|
||||
for _, resourceName := range resourceNames {
|
||||
totalAvailableUsage[resourceName] = &resource.Quantity{}
|
||||
available, err := assessAvailableResourceInNodes(destinationNodes, resourceNames)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "unable to assess available resources in nodes")
|
||||
return
|
||||
}
|
||||
|
||||
taintsOfDestinationNodes := make(map[string][]v1.Taint, len(destinationNodes))
|
||||
klog.V(1).InfoS("Total capacity to be moved", usageToKeysAndValues(available)...)
|
||||
|
||||
destinationTaints := make(map[string][]v1.Taint, len(destinationNodes))
|
||||
for _, node := range destinationNodes {
|
||||
taintsOfDestinationNodes[node.node.Name] = node.node.Spec.Taints
|
||||
|
||||
for _, name := range resourceNames {
|
||||
if _, exists := node.usage[name]; !exists {
|
||||
klog.Errorf("unable to find %q resource in node's %q usage, terminating eviction", name, node.node.Name)
|
||||
return
|
||||
}
|
||||
if _, ok := totalAvailableUsage[name]; !ok {
|
||||
totalAvailableUsage[name] = resource.NewQuantity(0, resource.DecimalSI)
|
||||
}
|
||||
totalAvailableUsage[name].Add(*node.thresholds.highResourceThreshold[name])
|
||||
totalAvailableUsage[name].Sub(*node.usage[name])
|
||||
}
|
||||
destinationTaints[node.node.Name] = node.node.Spec.Taints
|
||||
}
|
||||
|
||||
// log message in one line
|
||||
klog.V(1).InfoS("Total capacity to be moved", usageToKeysAndValues(totalAvailableUsage)...)
|
||||
|
||||
for _, node := range sourceNodes {
|
||||
klog.V(3).InfoS("Evicting pods from node", "node", klog.KObj(node.node), "usage", node.usage)
|
||||
klog.V(3).InfoS(
|
||||
"Evicting pods from node",
|
||||
"node", klog.KObj(node.node),
|
||||
"usage", node.usage,
|
||||
)
|
||||
|
||||
nonRemovablePods, removablePods := classifyPods(node.allPods, podFilter)
|
||||
klog.V(2).InfoS("Pods on node", "node", klog.KObj(node.node), "allPods", len(node.allPods), "nonRemovablePods", len(nonRemovablePods), "removablePods", len(removablePods))
|
||||
klog.V(2).InfoS(
|
||||
"Pods on node",
|
||||
"node", klog.KObj(node.node),
|
||||
"allPods", len(node.allPods),
|
||||
"nonRemovablePods", len(nonRemovablePods),
|
||||
"removablePods", len(removablePods),
|
||||
)
|
||||
|
||||
if len(removablePods) == 0 {
|
||||
klog.V(1).InfoS("No removable pods on node, try next node", "node", klog.KObj(node.node))
|
||||
klog.V(1).InfoS(
|
||||
"No removable pods on node, try next node",
|
||||
"node", klog.KObj(node.node),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
klog.V(1).InfoS("Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers")
|
||||
// sort the evictable Pods based on priority. This also sorts them based on QoS. If there are multiple pods with same priority, they are sorted based on QoS tiers.
|
||||
klog.V(1).InfoS(
|
||||
"Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers",
|
||||
)
|
||||
|
||||
// sort the evictable Pods based on priority. This also sorts
|
||||
// them based on QoS. If there are multiple pods with same
|
||||
// priority, they are sorted based on QoS tiers.
|
||||
podutil.SortPodsBasedOnPriorityLowToHigh(removablePods)
|
||||
err := evictPods(ctx, evictableNamespaces, removablePods, node, totalAvailableUsage, taintsOfDestinationNodes, podEvictor, evictOptions, continueEviction, usageClient, maxNoOfPodsToEvictPerNode)
|
||||
if err != nil {
|
||||
|
||||
if err := evictPods(
|
||||
ctx,
|
||||
evictableNamespaces,
|
||||
removablePods,
|
||||
node,
|
||||
available,
|
||||
destinationTaints,
|
||||
podEvictor,
|
||||
evictOptions,
|
||||
continueEviction,
|
||||
usageClient,
|
||||
maxNoOfPodsToEvictPerNode,
|
||||
); err != nil {
|
||||
switch err.(type) {
|
||||
case *evictions.EvictionTotalLimitError:
|
||||
return
|
||||
@@ -258,103 +250,136 @@ func evictPodsFromSourceNodes(
|
||||
}
|
||||
}
|
||||
|
||||
// evictPods keeps evicting pods until the continueEviction function returns
|
||||
// false or we can't or shouldn't evict any more pods. available node resources
|
||||
// are updated after each eviction.
|
||||
func evictPods(
|
||||
ctx context.Context,
|
||||
evictableNamespaces *api.Namespaces,
|
||||
inputPods []*v1.Pod,
|
||||
nodeInfo NodeInfo,
|
||||
totalAvailableUsage api.ReferencedResourceList,
|
||||
taintsOfLowNodes map[string][]v1.Taint,
|
||||
destinationTaints map[string][]v1.Taint,
|
||||
podEvictor frameworktypes.Evictor,
|
||||
evictOptions evictions.EvictOptions,
|
||||
continueEviction continueEvictionCond,
|
||||
usageClient usageClient,
|
||||
maxNoOfPodsToEvictPerNode *uint,
|
||||
) error {
|
||||
// preemptive check to see if we should continue evicting pods.
|
||||
if !continueEviction(nodeInfo, totalAvailableUsage) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// some namespaces can be excluded from the eviction process.
|
||||
var excludedNamespaces sets.Set[string]
|
||||
if evictableNamespaces != nil {
|
||||
excludedNamespaces = sets.New(evictableNamespaces.Exclude...)
|
||||
}
|
||||
|
||||
var evictionCounter uint = 0
|
||||
if continueEviction(nodeInfo, totalAvailableUsage) {
|
||||
for _, pod := range inputPods {
|
||||
if maxNoOfPodsToEvictPerNode != nil && evictionCounter >= *maxNoOfPodsToEvictPerNode {
|
||||
klog.V(3).InfoS("Max number of evictions per node per plugin reached", "limit", *maxNoOfPodsToEvictPerNode)
|
||||
break
|
||||
}
|
||||
if !utils.PodToleratesTaints(pod, taintsOfLowNodes) {
|
||||
klog.V(3).InfoS("Skipping eviction for pod, doesn't tolerate node taint", "pod", klog.KObj(pod))
|
||||
for _, pod := range inputPods {
|
||||
if maxNoOfPodsToEvictPerNode != nil && evictionCounter >= *maxNoOfPodsToEvictPerNode {
|
||||
klog.V(3).InfoS(
|
||||
"Max number of evictions per node per plugin reached",
|
||||
"limit", *maxNoOfPodsToEvictPerNode,
|
||||
)
|
||||
break
|
||||
}
|
||||
|
||||
if !utils.PodToleratesTaints(pod, destinationTaints) {
|
||||
klog.V(3).InfoS(
|
||||
"Skipping eviction for pod, doesn't tolerate node taint",
|
||||
"pod", klog.KObj(pod),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
// verify if we can evict the pod based on the pod evictor
|
||||
// filter and on the excluded namespaces.
|
||||
preEvictionFilterWithOptions, err := podutil.
|
||||
NewOptions().
|
||||
WithFilter(podEvictor.PreEvictionFilter).
|
||||
WithoutNamespaces(excludedNamespaces).
|
||||
BuildFilterFunc()
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "could not build preEvictionFilter with namespace exclusion")
|
||||
continue
|
||||
}
|
||||
|
||||
if !preEvictionFilterWithOptions(pod) {
|
||||
continue
|
||||
}
|
||||
|
||||
// in case podUsage does not support resource counting (e.g.
|
||||
// provided metric does not quantify pod resource utilization).
|
||||
unconstrainedResourceEviction := false
|
||||
podUsage, err := usageClient.podUsage(pod)
|
||||
if err != nil {
|
||||
if _, ok := err.(*notSupportedError); !ok {
|
||||
klog.Errorf(
|
||||
"unable to get pod usage for %v/%v: %v",
|
||||
pod.Namespace, pod.Name, err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
unconstrainedResourceEviction = true
|
||||
}
|
||||
|
||||
preEvictionFilterWithOptions, err := podutil.NewOptions().
|
||||
WithFilter(podEvictor.PreEvictionFilter).
|
||||
WithoutNamespaces(excludedNamespaces).
|
||||
BuildFilterFunc()
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "could not build preEvictionFilter with namespace exclusion")
|
||||
continue
|
||||
}
|
||||
|
||||
if !preEvictionFilterWithOptions(pod) {
|
||||
continue
|
||||
}
|
||||
|
||||
// In case podUsage does not support resource counting (e.g. provided metric
|
||||
// does not quantify pod resource utilization).
|
||||
unconstrainedResourceEviction := false
|
||||
podUsage, err := usageClient.podUsage(pod)
|
||||
if err != nil {
|
||||
if _, ok := err.(*notSupportedError); !ok {
|
||||
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
|
||||
continue
|
||||
}
|
||||
unconstrainedResourceEviction = true
|
||||
}
|
||||
err = podEvictor.Evict(ctx, pod, evictOptions)
|
||||
if err == nil {
|
||||
if maxNoOfPodsToEvictPerNode == nil && unconstrainedResourceEviction {
|
||||
klog.V(3).InfoS("Currently, only a single pod eviction is allowed")
|
||||
break
|
||||
}
|
||||
evictionCounter++
|
||||
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))
|
||||
if unconstrainedResourceEviction {
|
||||
continue
|
||||
}
|
||||
for name := range totalAvailableUsage {
|
||||
if name == v1.ResourcePods {
|
||||
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
|
||||
totalAvailableUsage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
|
||||
} else {
|
||||
nodeInfo.usage[name].Sub(*podUsage[name])
|
||||
totalAvailableUsage[name].Sub(*podUsage[name])
|
||||
}
|
||||
}
|
||||
|
||||
keysAndValues := []interface{}{
|
||||
"node", nodeInfo.node.Name,
|
||||
}
|
||||
keysAndValues = append(keysAndValues, usageToKeysAndValues(nodeInfo.usage)...)
|
||||
klog.V(3).InfoS("Updated node usage", keysAndValues...)
|
||||
// check if pods can be still evicted
|
||||
if !continueEviction(nodeInfo, totalAvailableUsage) {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := podEvictor.Evict(ctx, pod, evictOptions); err != nil {
|
||||
switch err.(type) {
|
||||
case *evictions.EvictionNodeLimitError, *evictions.EvictionTotalLimitError:
|
||||
return err
|
||||
default:
|
||||
klog.Errorf("eviction failed: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if maxNoOfPodsToEvictPerNode == nil && unconstrainedResourceEviction {
|
||||
klog.V(3).InfoS("Currently, only a single pod eviction is allowed")
|
||||
break
|
||||
}
|
||||
|
||||
evictionCounter++
|
||||
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))
|
||||
if unconstrainedResourceEviction {
|
||||
continue
|
||||
}
|
||||
|
||||
subtractPodUsageFromNodeAvailability(totalAvailableUsage, &nodeInfo, podUsage)
|
||||
|
||||
keysAndValues := []any{"node", nodeInfo.node.Name}
|
||||
keysAndValues = append(keysAndValues, usageToKeysAndValues(nodeInfo.usage)...)
|
||||
klog.V(3).InfoS("Updated node usage", keysAndValues...)
|
||||
|
||||
// make sure we should continue evicting pods.
|
||||
if !continueEviction(nodeInfo, totalAvailableUsage) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// subtractPodUsageFromNodeAvailability subtracts the pod usage from the node
|
||||
// available resources. this is done to keep track of the remaining resources
|
||||
// that can be used to move pods around.
|
||||
func subtractPodUsageFromNodeAvailability(
|
||||
available api.ReferencedResourceList,
|
||||
nodeInfo *NodeInfo,
|
||||
podUsage api.ReferencedResourceList,
|
||||
) {
|
||||
for name := range available {
|
||||
if name == v1.ResourcePods {
|
||||
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
|
||||
available[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
|
||||
continue
|
||||
}
|
||||
nodeInfo.usage[name].Sub(*podUsage[name])
|
||||
available[name].Sub(*podUsage[name])
|
||||
}
|
||||
}
|
||||
|
||||
// sortNodesByUsage sorts nodes based on usage according to the given plugin.
|
||||
func sortNodesByUsage(nodes []NodeInfo, ascending bool) {
|
||||
sort.Slice(nodes, func(i, j int) bool {
|
||||
@@ -428,6 +453,8 @@ func getResourceNames(thresholds api.ResourceThresholds) []v1.ResourceName {
|
||||
return resourceNames
|
||||
}
|
||||
|
||||
// classifyPods classify them in two lists: removable and non-removable.
|
||||
// Removable pods are those that can be evicted.
|
||||
func classifyPods(pods []*v1.Pod, filter func(pod *v1.Pod) bool) ([]*v1.Pod, []*v1.Pod) {
|
||||
var nonRemovablePods, removablePods []*v1.Pod
|
||||
|
||||
@@ -507,29 +534,36 @@ func assessNodesUsagesAndRelativeThresholds(
|
||||
func referencedResourceListForNodesCapacity(nodes []*v1.Node) map[string]api.ReferencedResourceList {
|
||||
capacities := map[string]api.ReferencedResourceList{}
|
||||
for _, node := range nodes {
|
||||
capacity := node.Status.Capacity
|
||||
if len(node.Status.Allocatable) > 0 {
|
||||
capacity = node.Status.Allocatable
|
||||
}
|
||||
|
||||
referenced := api.ReferencedResourceList{}
|
||||
for name, quantity := range capacity {
|
||||
referenced[name] = ptr.To(quantity)
|
||||
}
|
||||
|
||||
// XXX the descheduler also manages monitoring queries that are
|
||||
// supposed to return a value representing a percentage of the
|
||||
// resource usage. In this case we need to provide a value for
|
||||
// the MetricResource, which is not present in the node capacity.
|
||||
referenced[MetricResource] = resource.NewQuantity(
|
||||
100, resource.DecimalSI,
|
||||
)
|
||||
|
||||
capacities[node.Name] = referenced
|
||||
capacities[node.Name] = referencedResourceListForNodeCapacity(node)
|
||||
}
|
||||
return capacities
|
||||
}
|
||||
|
||||
// referencedResourceListForNodeCapacity returns a ReferencedResourceList for
|
||||
// the capacity of a node. If allocatable resources are present, they are used
|
||||
// instead of capacity.
|
||||
func referencedResourceListForNodeCapacity(node *v1.Node) api.ReferencedResourceList {
|
||||
capacity := node.Status.Capacity
|
||||
if len(node.Status.Allocatable) > 0 {
|
||||
capacity = node.Status.Allocatable
|
||||
}
|
||||
|
||||
referenced := api.ReferencedResourceList{}
|
||||
for name, quantity := range capacity {
|
||||
referenced[name] = ptr.To(quantity)
|
||||
}
|
||||
|
||||
// XXX the descheduler also manages monitoring queries that are
|
||||
// supposed to return a value representing a percentage of the
|
||||
// resource usage. In this case we need to provide a value for
|
||||
// the MetricResource, which is not present in the node capacity.
|
||||
referenced[MetricResource] = resource.NewQuantity(
|
||||
100, resource.DecimalSI,
|
||||
)
|
||||
|
||||
return referenced
|
||||
}
|
||||
|
||||
// ResourceUsage2ResourceThreshold is an implementation of a Normalizer that
|
||||
// converts a set of resource usages and totals into percentage. This function
|
||||
// operates on Quantity Value() for all the resources except CPU, where it uses
|
||||
@@ -570,20 +604,16 @@ func uniquifyResourceNames(resourceNames []v1.ResourceName) []v1.ResourceName {
|
||||
for _, resourceName := range resourceNames {
|
||||
resourceNamesMap[resourceName] = true
|
||||
}
|
||||
extendedResourceNames := []v1.ResourceName{}
|
||||
for resourceName := range resourceNamesMap {
|
||||
extendedResourceNames = append(extendedResourceNames, resourceName)
|
||||
}
|
||||
return extendedResourceNames
|
||||
return slices.Collect(maps.Keys(resourceNamesMap))
|
||||
}
|
||||
|
||||
// filterResourceNamesFromNodeUsage removes from the node usage slice all keys
|
||||
// that are not present in the resourceNames slice.
|
||||
func filterResourceNamesFromNodeUsage(
|
||||
nodeUsage map[string]api.ReferencedResourceList, resourceNames []v1.ResourceName,
|
||||
func filterResourceNames(
|
||||
from map[string]api.ReferencedResourceList, resourceNames []v1.ResourceName,
|
||||
) map[string]api.ReferencedResourceList {
|
||||
newNodeUsage := make(map[string]api.ReferencedResourceList)
|
||||
for nodeName, usage := range nodeUsage {
|
||||
for nodeName, usage := range from {
|
||||
newNodeUsage[nodeName] = api.ReferencedResourceList{}
|
||||
for _, resourceName := range resourceNames {
|
||||
if _, exists := usage[resourceName]; exists {
|
||||
@@ -593,3 +623,127 @@ func filterResourceNamesFromNodeUsage(
|
||||
}
|
||||
return newNodeUsage
|
||||
}
|
||||
|
||||
// capNodeCapacitiesToThreshold caps the node capacities to the given
|
||||
// thresholds. if a threshold is not set for a resource, the full capacity is
|
||||
// returned.
|
||||
func capNodeCapacitiesToThreshold(
|
||||
node *v1.Node,
|
||||
thresholds api.ResourceThresholds,
|
||||
resourceNames []v1.ResourceName,
|
||||
) api.ReferencedResourceList {
|
||||
capped := api.ReferencedResourceList{}
|
||||
for _, resourceName := range resourceNames {
|
||||
capped[resourceName] = capNodeCapacityToThreshold(
|
||||
node, thresholds, resourceName,
|
||||
)
|
||||
}
|
||||
return capped
|
||||
}
|
||||
|
||||
// capNodeCapacityToThreshold caps the node capacity to the given threshold. if
|
||||
// no threshold is set for the resource, the full capacity is returned.
|
||||
func capNodeCapacityToThreshold(
|
||||
node *v1.Node, thresholds api.ResourceThresholds, resourceName v1.ResourceName,
|
||||
) *resource.Quantity {
|
||||
capacities := referencedResourceListForNodeCapacity(node)
|
||||
if _, ok := capacities[resourceName]; !ok {
|
||||
// if the node knows nothing about the resource we return a
|
||||
// zero capacity for it.
|
||||
return resource.NewQuantity(0, resource.DecimalSI)
|
||||
}
|
||||
|
||||
// if no threshold is set then we simply return the full capacity.
|
||||
if _, ok := thresholds[resourceName]; !ok {
|
||||
return capacities[resourceName]
|
||||
}
|
||||
|
||||
// now that we have a capacity and a threshold we need to do the math
|
||||
// to cap the former to the latter.
|
||||
quantity := capacities[resourceName]
|
||||
threshold := thresholds[resourceName]
|
||||
|
||||
// we have a different format for memory. all the other resources are
|
||||
// in the DecimalSI format.
|
||||
format := resource.DecimalSI
|
||||
if resourceName == v1.ResourceMemory {
|
||||
format = resource.BinarySI
|
||||
}
|
||||
|
||||
// this is what we use to cap the capacity. thresholds are expected to
|
||||
// be in the <0;100> interval.
|
||||
fraction := func(threshold api.Percentage, capacity int64) int64 {
|
||||
return int64(float64(threshold) * 0.01 * float64(capacity))
|
||||
}
|
||||
|
||||
// here we also vary a little bit. milli is used for cpu, all the rest
|
||||
// goes with the default.
|
||||
if resourceName == v1.ResourceCPU {
|
||||
return resource.NewMilliQuantity(
|
||||
fraction(threshold, quantity.MilliValue()),
|
||||
format,
|
||||
)
|
||||
}
|
||||
|
||||
return resource.NewQuantity(
|
||||
fraction(threshold, quantity.Value()),
|
||||
format,
|
||||
)
|
||||
}
|
||||
|
||||
// assessAvailableResourceInNodes computes the available resources in all the
|
||||
// nodes. this is done by summing up all the available resources in all the
|
||||
// nodes and then subtracting the usage from it.
|
||||
func assessAvailableResourceInNodes(
|
||||
nodes []NodeInfo, resources []v1.ResourceName,
|
||||
) (api.ReferencedResourceList, error) {
|
||||
// available holds a sum of all the resources that can be used to move
|
||||
// pods around. e.g. the sum of all available cpu and memory in all
|
||||
// cluster nodes.
|
||||
available := api.ReferencedResourceList{}
|
||||
for _, node := range nodes {
|
||||
for _, resourceName := range resources {
|
||||
if _, exists := node.usage[resourceName]; !exists {
|
||||
return nil, fmt.Errorf(
|
||||
"unable to find %s resource in node's %s usage, terminating eviction",
|
||||
resourceName, node.node.Name,
|
||||
)
|
||||
}
|
||||
|
||||
// XXX this should never happen. we better bail out
|
||||
// here than hard crash with a segfault.
|
||||
if node.usage[resourceName] == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"unable to find %s usage resources, terminating eviction",
|
||||
resourceName,
|
||||
)
|
||||
}
|
||||
|
||||
// keep the current usage around so we can subtract it
|
||||
// from the available resources.
|
||||
usage := *node.usage[resourceName]
|
||||
|
||||
// first time seeing this resource, initialize it.
|
||||
if _, ok := available[resourceName]; !ok {
|
||||
available[resourceName] = resource.NewQuantity(
|
||||
0, resource.DecimalSI,
|
||||
)
|
||||
}
|
||||
|
||||
// XXX this should never happen. we better bail out
|
||||
// here than hard crash with a segfault.
|
||||
if node.available[resourceName] == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"unable to find %s available resources, terminating eviction",
|
||||
resourceName,
|
||||
)
|
||||
}
|
||||
|
||||
// now we add the capacity and then subtract the usage.
|
||||
available[resourceName].Add(*node.available[resourceName])
|
||||
available[resourceName].Sub(usage)
|
||||
}
|
||||
}
|
||||
|
||||
return available, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user