diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index a4c7530f72..ad2dd9158e 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -347,7 +347,8 @@ grpc_tls_config: # CLI flag: -server.grpc-max-send-msg-size-bytes [grpc_server_max_send_msg_size: | default = 4194304] -# Limit on the number of concurrent streams for gRPC calls (0 = unlimited) +# Limit on the number of concurrent streams for gRPC calls per client connection +# (0 = unlimited) # CLI flag: -server.grpc-max-concurrent-streams [grpc_server_max_concurrent_streams: | default = 100] diff --git a/go.mod b/go.mod index 46759b37b2..596006b430 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.5.0 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 - github.com/grafana/dskit v0.0.0-20230823104051-002b55ecf009 + github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd diff --git a/go.sum b/go.sum index 1ba17a6592..749fd7c547 100644 --- a/go.sum +++ b/go.sum @@ -1081,8 +1081,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= -github.com/grafana/dskit v0.0.0-20230823104051-002b55ecf009 h1:SRl8hp9MtwpkAwnh5/DsQmbwTgCKar8tR8rf5r9gi9U= -github.com/grafana/dskit v0.0.0-20230823104051-002b55ecf009/go.mod h1:3u7fr4hmOhuUL9Yc1QP/oa3za73kxvqJnRJH4BA5fOM= +github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd h1:RHZuBHWNS2HRJ5XhQK7cKP11EMMJPtJO2xKvQ+ws+PU= +github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd/go.mod h1:3u7fr4hmOhuUL9Yc1QP/oa3za73kxvqJnRJH4BA5fOM= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/pkg/util/spanlogger/spanlogger.go b/pkg/util/spanlogger/spanlogger.go index 6209f8b7d4..dbbf7679f7 100644 --- a/pkg/util/spanlogger/spanlogger.go +++ b/pkg/util/spanlogger/spanlogger.go @@ -5,15 +5,11 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/spanlogger" - "github.com/opentracing/opentracing-go" - "github.com/grafana/dskit/tenant" util_log "github.com/grafana/loki/pkg/util/log" ) -type loggerCtxMarker struct{} - const ( // TenantIDsTagName is the tenant IDs tag name. TenantIDsTagName = spanlogger.TenantIDsTagName @@ -30,8 +26,7 @@ func (r *resolverProxy) TenantIDs(ctx context.Context) ([]string, error) { } var ( - loggerCtxKey = &loggerCtxMarker{} - resolver = &resolverProxy{} + resolver = &resolverProxy{} ) // SpanLogger unifies tracing and logging, to reduce repetition. @@ -50,7 +45,7 @@ func NewWithLogger(ctx context.Context, logger log.Logger, method string, kvps . // FromContext returns a SpanLogger using the current parent span. // If there is no parent span, the SpanLogger will only log to the logger -// within the context. If the context doesn't have a logger, the fallback +// within the context. If the context doesn't have a logger, the global // logger is used. func FromContext(ctx context.Context) *SpanLogger { return spanlogger.FromContext(ctx, util_log.Logger, resolver) @@ -61,16 +56,5 @@ func FromContext(ctx context.Context) *SpanLogger { // within the context. If the context doesn't have a logger, the fallback // logger is used. func FromContextWithFallback(ctx context.Context, fallback log.Logger) *SpanLogger { - logger, ok := ctx.Value(loggerCtxKey).(log.Logger) - if !ok { - logger = fallback - } - sp := opentracing.SpanFromContext(ctx) - if sp == nil { - sp = defaultNoopSpan - } - return &SpanLogger{ - Logger: util_log.WithContext(ctx, logger), - Span: sp, - } + return spanlogger.FromContext(ctx, fallback, resolver) } diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index 6b612a661e..726d62b488 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -168,7 +168,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HTTPServerIdleTimeout, "server.http-idle-timeout", 120*time.Second, "Idle timeout for HTTP server") f.IntVar(&cfg.GPRCServerMaxRecvMsgSize, "server.grpc-max-recv-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can receive (bytes).") f.IntVar(&cfg.GRPCServerMaxSendMsgSize, "server.grpc-max-send-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can send (bytes).") - f.UintVar(&cfg.GPRCServerMaxConcurrentStreams, "server.grpc-max-concurrent-streams", 100, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)") + f.UintVar(&cfg.GPRCServerMaxConcurrentStreams, "server.grpc-max-concurrent-streams", 100, "Limit on the number of concurrent streams for gRPC calls per client connection (0 = unlimited)") f.DurationVar(&cfg.GRPCServerMaxConnectionIdle, "server.grpc.keepalive.max-connection-idle", infinty, "The duration after which an idle connection should be closed. Default: infinity") f.DurationVar(&cfg.GRPCServerMaxConnectionAge, "server.grpc.keepalive.max-connection-age", infinty, "The duration for the maximum amount of time a connection may exist before it will be closed. Default: infinity") f.DurationVar(&cfg.GRPCServerMaxConnectionAgeGrace, "server.grpc.keepalive.max-connection-age-grace", infinty, "An additive period after max-connection-age after which the connection will be forcibly closed. Default: infinity") diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 9a063e0a4d..08653eda38 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -3,6 +3,8 @@ package spanlogger import ( "context" + "go.uber.org/atomic" // Really just need sync/atomic but there is a lint rule preventing it. + "github.com/go-kit/log" "github.com/go-kit/log/level" opentracing "github.com/opentracing/opentracing-go" @@ -33,9 +35,13 @@ var ( // SpanLogger unifies tracing and logging, to reduce repetition. type SpanLogger struct { - log.Logger + ctx context.Context // context passed in, with logger + resolver TenantResolver // passed in + baseLogger log.Logger // passed in + logger atomic.Pointer[log.Logger] // initialized on first use opentracing.Span - sampled bool + sampled bool + debugEnabled bool } // New makes a new SpanLogger with a log.Logger to send logs to. The provided context will have the logger attached @@ -45,14 +51,17 @@ func New(ctx context.Context, logger log.Logger, method string, resolver TenantR if ids, err := resolver.TenantIDs(ctx); err == nil && len(ids) > 0 { span.SetTag(TenantIDsTagName, ids) } - lwc, sampled := withContext(ctx, logger, resolver) + _, sampled := tracing.ExtractSampledTraceID(ctx) l := &SpanLogger{ - Logger: log.With(lwc, "method", method), - Span: span, - sampled: sampled, + ctx: ctx, + resolver: resolver, + baseLogger: log.With(logger, "method", method), + Span: span, + sampled: sampled, + debugEnabled: debugEnabled(logger), } if len(kvps) > 0 { - level.Debug(l).Log(kvps...) + l.DebugLog(kvps...) } ctx = context.WithValue(ctx, loggerCtxKey, logger) @@ -68,22 +77,52 @@ func FromContext(ctx context.Context, fallback log.Logger, resolver TenantResolv if !ok { logger = fallback } + sampled := false sp := opentracing.SpanFromContext(ctx) if sp == nil { sp = opentracing.NoopTracer{}.StartSpan("noop") + } else { + _, sampled = tracing.ExtractSampledTraceID(ctx) } - lwc, sampled := withContext(ctx, logger, resolver) return &SpanLogger{ - Logger: lwc, - Span: sp, - sampled: sampled, + ctx: ctx, + baseLogger: logger, + resolver: resolver, + Span: sp, + sampled: sampled, + debugEnabled: debugEnabled(logger), + } +} + +// Detect whether we should output debug logging. +// false iff the logger says it's not enabled; true if the logger doesn't say. +func debugEnabled(logger log.Logger) bool { + if x, ok := logger.(interface{ DebugEnabled() bool }); ok && !x.DebugEnabled() { + return false } + return true } // Log implements gokit's Logger interface; sends logs to underlying logger and // also puts the on the spans. func (s *SpanLogger) Log(kvps ...interface{}) error { - s.Logger.Log(kvps...) + s.getLogger().Log(kvps...) + return s.spanLog(kvps...) +} + +// DebugLog is more efficient than level.Debug().Log(). +// Also it swallows the error return because nobody checks for errors on debug logs. +func (s *SpanLogger) DebugLog(kvps ...interface{}) { + if s.debugEnabled { + // The call to Log() through an interface makes its argument escape, so make a copy here, + // in the debug-only path, so the function is faster for the non-debug path. + localCopy := append([]any{}, kvps...) + level.Debug(s.getLogger()).Log(localCopy...) + } + _ = s.spanLog(kvps...) +} + +func (s *SpanLogger) spanLog(kvps ...interface{}) error { if !s.sampled { return nil } @@ -105,16 +144,26 @@ func (s *SpanLogger) Error(err error) error { return err } -func withContext(ctx context.Context, logger log.Logger, resolver TenantResolver) (log.Logger, bool) { - userID, err := resolver.TenantID(ctx) +func (s *SpanLogger) getLogger() log.Logger { + pLogger := s.logger.Load() + if pLogger != nil { + return *pLogger + } + // If no logger stored in the pointer, start to make one. + logger := s.baseLogger + userID, err := s.resolver.TenantID(s.ctx) if err == nil && userID != "" { logger = log.With(logger, "user", userID) } - traceID, ok := tracing.ExtractSampledTraceID(ctx) - if !ok { - return logger, false + traceID, ok := tracing.ExtractSampledTraceID(s.ctx) + if ok { + logger = log.With(logger, "traceID", traceID) } - - return log.With(logger, "traceID", traceID), true + // If the value has been set by another goroutine, fetch that other value and discard the one we made. + if !s.logger.CompareAndSwap(nil, &logger) { + pLogger := s.logger.Load() + logger = *pLogger + } + return logger } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3b352836d8..8a13f6aa40 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -836,7 +836,7 @@ github.com/gorilla/websocket # github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 ## explicit; go 1.17 github.com/grafana/cloudflare-go -# github.com/grafana/dskit v0.0.0-20230823104051-002b55ecf009 +# github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd ## explicit; go 1.19 github.com/grafana/dskit/aws github.com/grafana/dskit/backoff