1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-26 05:14:13 +01:00

Merge pull request #1189 from harshanarayana/feature/enable-otel

feat: Enable open telemetry tracing
This commit is contained in:
Kubernetes Prow Robot
2023-07-18 04:43:10 -07:00
committed by GitHub
15 changed files with 375 additions and 9 deletions

View File

@@ -43,6 +43,10 @@ IMAGE:=descheduler:$(VERSION)
# IMAGE_GCLOUD is the image name of descheduler in the remote registry
IMAGE_GCLOUD:=$(REGISTRY)/descheduler:$(VERSION)
# CURRENT_DIR is the current dir where the Makefile exists
CURRENT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# TODO: upload binaries to GCS bucket
#
# In the future binaries can be uploaded to
@@ -129,6 +133,9 @@ gen:
./hack/update-generated-defaulters.sh
./hack/update-docs.sh
gen-docker:
$(CONTAINER_ENGINE) run --entrypoint make -it -v $(CURRENT_DIR):/go/src/sigs.k8s.io/descheduler -w /go/src/sigs.k8s.io/descheduler golang:1.20.3 gen
verify-gen:
./hack/verify-conversions.sh
./hack/verify-deep-copies.sh

View File

@@ -82,6 +82,13 @@ deschedulerPolicy:
# maxNoOfPodsToEvictPerNamespace: 10
# ignorePvcPods: true
# evictLocalStoragePods: true
# tracing:
# collectorEndpoint: otel-collector.observability.svc.cluster.local:4317
# transportCert: ""
# serviceName: ""
# serviceNamespace: ""
# sampleRate: 1.0
# fallbackToNoOpProviderOnError: true
strategies:
RemoveDuplicates:
enabled: true

View File

@@ -29,6 +29,7 @@ import (
"sigs.k8s.io/descheduler/pkg/apis/componentconfig"
"sigs.k8s.io/descheduler/pkg/apis/componentconfig/v1alpha1"
deschedulerscheme "sigs.k8s.io/descheduler/pkg/descheduler/scheme"
"sigs.k8s.io/descheduler/pkg/tracing"
)
const (
@@ -74,7 +75,9 @@ func newDefaultComponentConfig() (*componentconfig.DeschedulerConfiguration, err
},
}
deschedulerscheme.Scheme.Default(&versionedCfg)
cfg := componentconfig.DeschedulerConfiguration{}
cfg := componentconfig.DeschedulerConfiguration{
Tracing: componentconfig.TracingConfiguration{},
}
if err := deschedulerscheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
@@ -92,6 +95,12 @@ func (rs *DeschedulerServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&rs.PolicyConfigFile, "policy-config-file", rs.PolicyConfigFile, "File with descheduler policy configuration.")
fs.BoolVar(&rs.DryRun, "dry-run", rs.DryRun, "Execute descheduler in dry run mode.")
fs.BoolVar(&rs.DisableMetrics, "disable-metrics", rs.DisableMetrics, "Disables metrics. The metrics are by default served through https://localhost:10258/metrics. Secure address, resp. port can be changed through --bind-address, resp. --secure-port flags.")
fs.StringVar(&rs.Tracing.CollectorEndpoint, "otel-collector-endpoint", "", "Set this flag to the OpenTelemetry Collector Service Address")
fs.StringVar(&rs.Tracing.TransportCert, "otel-transport-ca-cert", "", "Path of the CA Cert that can be used to generate the client Certificate for establishing secure connection to the OTEL in gRPC mode")
fs.StringVar(&rs.Tracing.ServiceName, "otel-service-name", tracing.DefaultServiceName, "OTEL Trace name to be used with the resources")
fs.StringVar(&rs.Tracing.ServiceNamespace, "otel-trace-namespace", "", "OTEL Trace namespace to be used with the resources")
fs.Float64Var(&rs.Tracing.SampleRate, "otel-sample-rate", 1.0, "Sample rate to collect the Traces")
fs.BoolVar(&rs.Tracing.FallbackToNoOpProviderOnError, "otel-fallback-no-op-on-error", false, "Fallback to NoOp Tracer in case of error")
componentbaseoptions.BindLeaderElectionFlags(&rs.LeaderElection, fs)

View File

@@ -28,6 +28,7 @@ import (
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/descheduler"
"sigs.k8s.io/descheduler/pkg/tracing"
"github.com/spf13/cobra"
@@ -112,6 +113,11 @@ func NewDeschedulerCommand(out io.Writer) *cobra.Command {
}
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
err := tracing.NewTracerProvider(ctx, rs.Tracing.CollectorEndpoint, rs.Tracing.TransportCert, rs.Tracing.ServiceName, rs.Tracing.ServiceNamespace, rs.Tracing.SampleRate, rs.Tracing.FallbackToNoOpProviderOnError)
if err != nil {
return err
}
defer tracing.Shutdown(ctx)
// increase the fake watch channel so the dry-run mode can be run
// over a cluster with thousands of pods
watch.DefaultChanSize = 100000

View File

@@ -32,6 +32,12 @@ descheduler [flags]
--leader-elect-resource-namespace string The namespace of resource object that is used for locking during leader election. (default "kube-system")
--leader-elect-retry-period duration The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled. (default 26s)
--logging-format string Sets the log format. Permitted formats: "text", "json". Non-default formats don't honor these flags: --add-dir-header, --alsologtostderr, --log-backtrace-at, --log_dir, --log_file, --log_file_max_size, --logtostderr, --skip-headers, --skip-log-headers, --stderrthreshold, --log-flush-frequency.\nNon-default choices are currently alpha and subject to change without warning. (default "text")
--otel-collector-endpoint string Set this flag to the OpenTelemetry Collector Service Address
--otel-fallback-no-op-on-error Fallback to NoOp Tracer in case of error
--otel-sample-rate float Sample rate to collect the Traces (default 1)
--otel-service-name string OTEL Trace name to be used with the resources (default "descheduler")
--otel-trace-namespace string OTEL Trace namespace to be used with the resources
--otel-transport-ca-cert string Path of the CA Cert that can be used to generate the client Certificate for establishing secure connection to the OTEL in gRPC mode
--permit-address-sharing If true, SO_REUSEADDR will be used when binding the port. This allows binding to wildcard IPs like 0.0.0.0 and specific IPs in parallel, and it avoids waiting for the kernel to release sockets in TIME_WAIT state. [default=false]
--permit-port-sharing If true, SO_REUSEPORT will be used when binding the port, which allows more than one instance to bind on the same address and port. [default=false]
--policy-config-file string File with descheduler policy configuration.

12
go.mod
View File

@@ -7,6 +7,12 @@ require (
github.com/google/go-cmp v0.5.9
github.com/spf13/cobra v1.6.0
github.com/spf13/pflag v1.0.5
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0
go.opentelemetry.io/otel/sdk v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
google.golang.org/grpc v1.51.0
k8s.io/api v0.27.0
k8s.io/apimachinery v0.27.0
k8s.io/apiserver v0.27.0
@@ -75,13 +81,8 @@ require (
go.etcd.io/etcd/client/v3 v3.5.7 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.1 // indirect
go.opentelemetry.io/otel v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 // indirect
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.10.0 // indirect
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
@@ -98,7 +99,6 @@ require (
golang.org/x/tools v0.7.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect

View File

@@ -55,6 +55,9 @@ type DeschedulerConfiguration struct {
// IgnorePVCPods sets whether PVC pods should be allowed to be evicted
IgnorePVCPods bool
// Tracing specifies the options for tracing.
Tracing TracingConfiguration
// LeaderElection starts Deployment using leader election loop
LeaderElection componentbaseconfig.LeaderElectionConfiguration
@@ -66,3 +69,25 @@ type DeschedulerConfiguration struct {
// Refer to [ClientConnection](https://pkg.go.dev/k8s.io/kubernetes/pkg/apis/componentconfig#ClientConnectionConfiguration) for more information.
ClientConnection componentbaseconfig.ClientConnectionConfiguration
}
type TracingConfiguration struct {
// CollectorEndpoint is the address of the OpenTelemetry collector.
// If not specified, tracing will be used NoopTraceProvider.
CollectorEndpoint string
// TransportCert is the path to the certificate file for the OpenTelemetry collector.
// If not specified, provider will start in insecure mode.
TransportCert string
// ServiceName is the name of the service to be used in the OpenTelemetry collector.
// If not specified, the default value is "descheduler".
ServiceName string
// ServiceNamespace is the namespace of the service to be used in the OpenTelemetry collector.
// If not specified, tracing will be used default namespace.
ServiceNamespace string
// SampleRate is used to configure the sample rate of the OTEL trace collection. This value will
// be used as the Base value with sample ratio. A value >= 1.0 will sample everything and < 0 will
// not sample anything. Everything else is a percentage value.
SampleRate float64
// FallbackToNoOpProviderOnError can be set in case if you want your trace provider to fallback to
// no op provider in case if the configured end point based provider can't be setup.
FallbackToNoOpProviderOnError bool
}

View File

@@ -55,6 +55,9 @@ type DeschedulerConfiguration struct {
// IgnorePVCPods sets whether PVC pods should be allowed to be evicted
IgnorePVCPods bool `json:"ignorePvcPods,omitempty"`
// Tracing is used to setup the required OTEL tracing configuration
Tracing TracingConfiguration `json:"tracing,omitempty"`
// LeaderElection starts Deployment using leader election loop
LeaderElection componentbaseconfig.LeaderElectionConfiguration `json:"leaderElection,omitempty"`
@@ -66,3 +69,25 @@ type DeschedulerConfiguration struct {
// Refer to [ClientConnection](https://pkg.go.dev/k8s.io/kubernetes/pkg/apis/componentconfig#ClientConnectionConfiguration) for more information.
ClientConnection componentbaseconfig.ClientConnectionConfiguration `json:"clientConnection,omitempty"`
}
type TracingConfiguration struct {
// CollectorEndpoint is the address of the OpenTelemetry collector.
// If not specified, tracing will be used NoopTraceProvider.
CollectorEndpoint string `json:"collectorEndpoint"`
// TransportCert is the path to the certificate file for the OpenTelemetry collector.
// If not specified, provider will start in insecure mode.
TransportCert string `json:"transportCert,omitempty"`
// ServiceName is the name of the service to be used in the OpenTelemetry collector.
// If not specified, the default value is "descheduler".
ServiceName string `json:"serviceName,omitempty"`
// ServiceNamespace is the namespace of the service to be used in the OpenTelemetry collector.
// If not specified, tracing will be used default namespace.
ServiceNamespace string `json:"serviceNamespace,omitempty"`
// SampleRate is used to configure the sample rate of the OTEL trace collection. This value will
// be used as the Base value with sample ratio. A value >= 1.0 will sample everything and < 0 will
// not sample anything. Everything else is a percentage value.
SampleRate float64 `json:"sampleRate"`
// FallbackToNoOpProviderOnError can be set in case if you want your trace provider to fallback to
// no op provider in case if the configured end point based provider can't be setup.
FallbackToNoOpProviderOnError bool `json:"fallbackToNoOpProviderOnError"`
}

View File

@@ -46,6 +46,16 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*TracingConfiguration)(nil), (*componentconfig.TracingConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_TracingConfiguration_To_componentconfig_TracingConfiguration(a.(*TracingConfiguration), b.(*componentconfig.TracingConfiguration), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*componentconfig.TracingConfiguration)(nil), (*TracingConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_componentconfig_TracingConfiguration_To_v1alpha1_TracingConfiguration(a.(*componentconfig.TracingConfiguration), b.(*TracingConfiguration), scope)
}); err != nil {
return err
}
return nil
}
@@ -58,6 +68,9 @@ func autoConvert_v1alpha1_DeschedulerConfiguration_To_componentconfig_Deschedule
out.MaxNoOfPodsToEvictPerNode = in.MaxNoOfPodsToEvictPerNode
out.EvictLocalStoragePods = in.EvictLocalStoragePods
out.IgnorePVCPods = in.IgnorePVCPods
if err := Convert_v1alpha1_TracingConfiguration_To_componentconfig_TracingConfiguration(&in.Tracing, &out.Tracing, s); err != nil {
return err
}
out.LeaderElection = in.LeaderElection
out.Logging = in.Logging
out.ClientConnection = in.ClientConnection
@@ -78,6 +91,9 @@ func autoConvert_componentconfig_DeschedulerConfiguration_To_v1alpha1_Deschedule
out.MaxNoOfPodsToEvictPerNode = in.MaxNoOfPodsToEvictPerNode
out.EvictLocalStoragePods = in.EvictLocalStoragePods
out.IgnorePVCPods = in.IgnorePVCPods
if err := Convert_componentconfig_TracingConfiguration_To_v1alpha1_TracingConfiguration(&in.Tracing, &out.Tracing, s); err != nil {
return err
}
out.LeaderElection = in.LeaderElection
out.Logging = in.Logging
out.ClientConnection = in.ClientConnection
@@ -88,3 +104,33 @@ func autoConvert_componentconfig_DeschedulerConfiguration_To_v1alpha1_Deschedule
func Convert_componentconfig_DeschedulerConfiguration_To_v1alpha1_DeschedulerConfiguration(in *componentconfig.DeschedulerConfiguration, out *DeschedulerConfiguration, s conversion.Scope) error {
return autoConvert_componentconfig_DeschedulerConfiguration_To_v1alpha1_DeschedulerConfiguration(in, out, s)
}
func autoConvert_v1alpha1_TracingConfiguration_To_componentconfig_TracingConfiguration(in *TracingConfiguration, out *componentconfig.TracingConfiguration, s conversion.Scope) error {
out.CollectorEndpoint = in.CollectorEndpoint
out.TransportCert = in.TransportCert
out.ServiceName = in.ServiceName
out.ServiceNamespace = in.ServiceNamespace
out.SampleRate = in.SampleRate
out.FallbackToNoOpProviderOnError = in.FallbackToNoOpProviderOnError
return nil
}
// Convert_v1alpha1_TracingConfiguration_To_componentconfig_TracingConfiguration is an autogenerated conversion function.
func Convert_v1alpha1_TracingConfiguration_To_componentconfig_TracingConfiguration(in *TracingConfiguration, out *componentconfig.TracingConfiguration, s conversion.Scope) error {
return autoConvert_v1alpha1_TracingConfiguration_To_componentconfig_TracingConfiguration(in, out, s)
}
func autoConvert_componentconfig_TracingConfiguration_To_v1alpha1_TracingConfiguration(in *componentconfig.TracingConfiguration, out *TracingConfiguration, s conversion.Scope) error {
out.CollectorEndpoint = in.CollectorEndpoint
out.TransportCert = in.TransportCert
out.ServiceName = in.ServiceName
out.ServiceNamespace = in.ServiceNamespace
out.SampleRate = in.SampleRate
out.FallbackToNoOpProviderOnError = in.FallbackToNoOpProviderOnError
return nil
}
// Convert_componentconfig_TracingConfiguration_To_v1alpha1_TracingConfiguration is an autogenerated conversion function.
func Convert_componentconfig_TracingConfiguration_To_v1alpha1_TracingConfiguration(in *componentconfig.TracingConfiguration, out *TracingConfiguration, s conversion.Scope) error {
return autoConvert_componentconfig_TracingConfiguration_To_v1alpha1_TracingConfiguration(in, out, s)
}

View File

@@ -29,6 +29,7 @@ import (
func (in *DeschedulerConfiguration) DeepCopyInto(out *DeschedulerConfiguration) {
*out = *in
out.TypeMeta = in.TypeMeta
out.Tracing = in.Tracing
out.LeaderElection = in.LeaderElection
in.Logging.DeepCopyInto(&out.Logging)
out.ClientConnection = in.ClientConnection
@@ -52,3 +53,19 @@ func (in *DeschedulerConfiguration) DeepCopyObject() runtime.Object {
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TracingConfiguration) DeepCopyInto(out *TracingConfiguration) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TracingConfiguration.
func (in *TracingConfiguration) DeepCopy() *TracingConfiguration {
if in == nil {
return nil
}
out := new(TracingConfiguration)
in.DeepCopyInto(out)
return out
}

View File

@@ -29,6 +29,7 @@ import (
func (in *DeschedulerConfiguration) DeepCopyInto(out *DeschedulerConfiguration) {
*out = *in
out.TypeMeta = in.TypeMeta
out.Tracing = in.Tracing
out.LeaderElection = in.LeaderElection
in.Logging.DeepCopyInto(&out.Logging)
out.ClientConnection = in.ClientConnection
@@ -52,3 +53,19 @@ func (in *DeschedulerConfiguration) DeepCopyObject() runtime.Object {
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TracingConfiguration) DeepCopyInto(out *TracingConfiguration) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TracingConfiguration.
func (in *TracingConfiguration) DeepCopy() *TracingConfiguration {
if in == nil {
return nil
}
out := new(TracingConfiguration)
in.DeepCopyInto(out)
return out
}

View File

@@ -25,6 +25,8 @@ import (
"strings"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/events"
@@ -46,6 +48,7 @@ import (
"sigs.k8s.io/descheduler/pkg/descheduler/client"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
@@ -106,6 +109,9 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
}
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "runDeschedulerLoop")
defer span.End()
loopStartDuration := time.Now()
defer metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
@@ -169,6 +175,9 @@ func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node)
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
// see https://github.com/kubernetes-sigs/descheduler/issues/979
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node, podEvictor *evictions.PodEvictor) {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "runProfiles")
defer span.End()
var profileRunners []profileRunner
for _, profile := range d.deschedulerPolicy.Profiles {
currProfile, err := frameworkprofile.NewProfile(
@@ -190,6 +199,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
// First deschedule
status := profileR.descheduleEPs(ctx, nodes)
if status != nil && status.Err != nil {
span.AddEvent("failed to perform deschedule operations", trace.WithAttributes(attribute.String("err", status.Err.Error()), attribute.String("profile", profileR.name), attribute.String("operation", tracing.DescheduleOperation)))
klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profileR.name)
continue
}
@@ -199,6 +209,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
// Balance Later
status := profileR.balanceEPs(ctx, nodes)
if status != nil && status.Err != nil {
span.AddEvent("failed to perform balance operations", trace.WithAttributes(attribute.String("err", status.Err.Error()), attribute.String("profile", profileR.name), attribute.String("operation", tracing.BalanceOperation)))
klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profileR.name)
continue
}
@@ -206,6 +217,9 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
}
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "Run")
defer span.End()
metrics.Register()
clientConnection := rs.ClientConnection
@@ -242,6 +256,7 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
}
if rs.LeaderElection.LeaderElect && rs.DeschedulingInterval.Seconds() == 0 {
span.AddEvent("Validation Failure", trace.WithAttributes(attribute.String("err", "leaderElection must be used with deschedulingInterval")))
return fmt.Errorf("leaderElection must be used with deschedulingInterval")
}
@@ -251,6 +266,7 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
if rs.LeaderElection.LeaderElect && !rs.DryRun {
if err := NewLeaderElection(runFn, rsclient, &rs.LeaderElection, ctx); err != nil {
span.AddEvent("Leader Election Failure", trace.WithAttributes(attribute.String("err", err.Error())))
return fmt.Errorf("leaderElection: %w", err)
}
return nil
@@ -366,6 +382,9 @@ func cachedClient(
}
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies")
defer span.End()
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
@@ -385,6 +404,7 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory)
if err != nil {
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
ctx, cancel := context.WithCancel(ctx)
@@ -394,14 +414,19 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
sharedInformerFactory.WaitForCacheSync(ctx.Done())
wait.NonSlidingUntil(func() {
nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeLister, nodeSelector)
// A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
defer sSpan.End()
nodes, err := nodeutil.ReadyNodes(sCtx, rs.Client, nodeLister, nodeSelector)
if err != nil {
sSpan.AddEvent("Failed to detect ready nodes", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
cancel()
return
}
err = descheduler.runDeschedulerLoop(ctx, nodes)
err = descheduler.runDeschedulerLoop(sCtx, nodes)
if err != nil {
sSpan.AddEvent("Failed to run descheduler loop", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
cancel()
return

View File

@@ -20,6 +20,8 @@ import (
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -30,6 +32,7 @@ import (
"sigs.k8s.io/descheduler/metrics"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
"sigs.k8s.io/descheduler/pkg/tracing"
)
// nodePodEvictedCount keeps count of pods evicted on node
@@ -113,6 +116,9 @@ type EvictOptions struct {
// EvictPod evicts a pod while exercising eviction limits.
// Returns true when the pod is evicted on the server side.
func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptions) bool {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "EvictPod", trace.WithAttributes(attribute.String("podName", pod.Name), attribute.String("podNamespace", pod.Namespace), attribute.String("reason", opts.Reason), attribute.String("operation", tracing.EvictOperation)))
defer span.End()
// TODO: Replace context-propagated Strategy name with a defined framework handle for accessing Strategy info
strategy := ""
if ctx.Value("strategyName") != nil {
@@ -124,6 +130,7 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio
if pe.metricsEnabled {
metrics.PodsEvicted.With(map[string]string{"result": "maximum number of pods per node reached", "strategy": strategy, "namespace": pod.Namespace, "node": pod.Spec.NodeName}).Inc()
}
span.AddEvent("Eviction Failed", trace.WithAttributes(attribute.String("node", pod.Spec.NodeName), attribute.String("err", "Maximum number of evicted pods per node reached")))
klog.ErrorS(fmt.Errorf("Maximum number of evicted pods per node reached"), "limit", *pe.maxPodsToEvictPerNode, "node", pod.Spec.NodeName)
return false
}
@@ -133,6 +140,7 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio
if pe.metricsEnabled {
metrics.PodsEvicted.With(map[string]string{"result": "maximum number of pods per namespace reached", "strategy": strategy, "namespace": pod.Namespace, "node": pod.Spec.NodeName}).Inc()
}
span.AddEvent("Eviction Failed", trace.WithAttributes(attribute.String("node", pod.Spec.NodeName), attribute.String("err", "Maximum number of evicted pods per namespace reached")))
klog.ErrorS(fmt.Errorf("Maximum number of evicted pods per namespace reached"), "limit", *pe.maxPodsToEvictPerNamespace, "namespace", pod.Namespace)
return false
}
@@ -140,6 +148,7 @@ func (pe *PodEvictor) EvictPod(ctx context.Context, pod *v1.Pod, opts EvictOptio
err := evictPod(ctx, pe.client, pod, pe.policyGroupVersion)
if err != nil {
// err is used only for logging purposes
span.AddEvent("Eviction Failed", trace.WithAttributes(attribute.String("node", pod.Spec.NodeName), attribute.String("err", err.Error())))
klog.ErrorS(err, "Error evicting pod", "pod", klog.KObj(pod), "reason", opts.Reason)
if pe.metricsEnabled {
metrics.PodsEvicted.With(map[string]string{"result": "error", "strategy": strategy, "namespace": pod.Namespace, "node": pod.Spec.NodeName}).Inc()

View File

@@ -18,12 +18,15 @@ import (
"fmt"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"sigs.k8s.io/descheduler/metrics"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/tracing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
@@ -300,6 +303,9 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts
func (d profileImpl) RunDeschedulePlugins(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
errs := []error{}
for _, pl := range d.deschedulePlugins {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, pl.Name(), trace.WithAttributes(attribute.String("plugin", pl.Name()), attribute.String("prpfile", d.profileName), attribute.String("operation", tracing.DescheduleOperation)))
defer span.End()
evicted := d.podEvictor.TotalEvicted()
// TODO: strategyName should be accessible from within the strategy using a framework
// handle or function which the Evictor has access to. For migration/in-progress framework
@@ -311,6 +317,7 @@ func (d profileImpl) RunDeschedulePlugins(ctx context.Context, nodes []*v1.Node)
metrics.DeschedulerStrategyDuration.With(map[string]string{"strategy": pl.Name(), "profile": d.profileName}).Observe(time.Since(strategyStart).Seconds())
if status != nil && status.Err != nil {
span.AddEvent("Plugin Execution Failed", trace.WithAttributes(attribute.String("err", status.Err.Error())))
errs = append(errs, fmt.Errorf("plugin %q finished with error: %v", pl.Name(), status.Err))
}
klog.V(1).InfoS("Total number of pods evicted", "extension point", "Deschedule", "evictedPods", d.podEvictor.TotalEvicted()-evicted)
@@ -329,6 +336,9 @@ func (d profileImpl) RunDeschedulePlugins(ctx context.Context, nodes []*v1.Node)
func (d profileImpl) RunBalancePlugins(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
errs := []error{}
for _, pl := range d.balancePlugins {
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, pl.Name(), trace.WithAttributes(attribute.String("plugin", pl.Name()), attribute.String("prpfile", d.profileName), attribute.String("operation", tracing.BalanceOperation)))
defer span.End()
evicted := d.podEvictor.TotalEvicted()
// TODO: strategyName should be accessible from within the strategy using a framework
// handle or function which the Evictor has access to. For migration/in-progress framework
@@ -340,6 +350,7 @@ func (d profileImpl) RunBalancePlugins(ctx context.Context, nodes []*v1.Node) *f
metrics.DeschedulerStrategyDuration.With(map[string]string{"strategy": pl.Name(), "profile": d.profileName}).Observe(time.Since(strategyStart).Seconds())
if status != nil && status.Err != nil {
span.AddEvent("Plugin Execution Failed", trace.WithAttributes(attribute.String("err", status.Err.Error())))
errs = append(errs, fmt.Errorf("plugin %q finished with error: %v", pl.Name(), status.Err))
}
klog.V(1).InfoS("Total number of pods evicted", "extension point", "Balance", "evictedPods", d.podEvictor.TotalEvicted()-evicted)

156
pkg/tracing/tracing.go Normal file
View File

@@ -0,0 +1,156 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
"context"
"crypto/x509"
"os"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials"
"k8s.io/klog/v2"
)
const (
// DefaultServiceName is the default service name used for tracing.
DefaultServiceName = "descheduler"
// DescheduleOperation is the operation name used for Deschedule() functions.
DescheduleOperation = "deschedule"
// BalanceOperation is the operation name used for Balance() functions.
BalanceOperation = "balance"
// EvictOperation is the operation name used for Evict() functions.
EvictOperation = "evict"
// TracerName is used to setup the named tracer
TracerName = "sigs.k8s.io/descheduler"
)
var (
tracer trace.Tracer
provider trace.TracerProvider
)
func init() {
provider = trace.NewNoopTracerProvider()
tracer = provider.Tracer(TracerName)
}
func Tracer() trace.Tracer {
return tracer
}
// NewTracerProvider creates a new trace provider with the given options.
func NewTracerProvider(ctx context.Context, endpoint, caCert, name, namespace string, sampleRate float64, fallbackToNoOpTracer bool) (err error) {
defer func(p trace.TracerProvider) {
if err != nil && !fallbackToNoOpTracer {
return
}
if err != nil && fallbackToNoOpTracer {
klog.ErrorS(err, "ran into an error trying to setup a trace provider. Falling back to NoOp provider")
err = nil
provider = trace.NewNoopTracerProvider()
}
otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTracerProvider(provider)
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
klog.ErrorS(err, "got error from opentelemetry")
}))
tracer = otel.GetTracerProvider().Tracer(TracerName)
}(provider)
if endpoint == "" {
klog.V(2).Info("Did not find a trace collector endpoint defined. Switching to NoopTraceProvider")
provider = trace.NewNoopTracerProvider()
return
}
var opts []otlptracegrpc.Option
opts = append(opts, otlptracegrpc.WithEndpoint(endpoint))
var data []byte
if caCert != "" {
data, err = os.ReadFile(caCert)
if err != nil {
klog.ErrorS(err, "failed to extract ca certificate required to generate the transport credentials")
return err
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(data) {
klog.Error("failed to create cert pool using the ca certificate provided for generating the transport credentials")
return err
}
klog.Info("Enabling trace GRPC client in secure TLS mode")
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(pool, "")))
} else {
klog.Info("Enabling trace GRPC client in insecure mode")
opts = append(opts, otlptracegrpc.WithInsecure())
}
client := otlptracegrpc.NewClient(opts...)
exporter, err := otlptrace.New(ctx, client)
if err != nil {
klog.ErrorS(err, "failed to create an instance of the trace exporter")
return err
}
if name == "" {
klog.V(5).InfoS("no name provided, using default service name for tracing", "name", DefaultServiceName)
name = DefaultServiceName
}
resourceOpts := []sdkresource.Option{sdkresource.WithAttributes(semconv.ServiceNameKey.String(name)), sdkresource.WithSchemaURL(semconv.SchemaURL), sdkresource.WithProcess()}
if namespace != "" {
resourceOpts = append(resourceOpts, sdkresource.WithAttributes(semconv.ServiceNamespaceKey.String(namespace)))
}
resource, err := sdkresource.New(ctx, resourceOpts...)
if err != nil {
klog.ErrorS(err, "failed to create traceable resource")
return err
}
spanProcessor := sdktrace.NewBatchSpanProcessor(exporter)
provider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRate))),
sdktrace.WithSpanProcessor(spanProcessor),
sdktrace.WithResource(resource),
)
klog.V(2).Info("Successfully setup trace provider")
return
}
// Shutdown shuts down the global trace exporter.
func Shutdown(ctx context.Context) error {
tp, ok := provider.(*sdktrace.TracerProvider)
if !ok {
return nil
}
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
klog.ErrorS(err, "failed to shutdown the trace exporter")
return err
}
return nil
}