From c1fada9af01fded6c4e08c5bd54917a8dad5d744 Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Fri, 21 Jun 2024 15:24:39 -0300 Subject: [PATCH] refactor: Remove unnecessary spanlogger usage (#13255) Historically, we've been using `spanlogger` to enrich our spans. That isn't the right usage for it (as of now), as they'll be emitting log lines. Our current usage is causing: - Noisy/important log lines less visible - Unnecessary logging volume - Spread of the bad practice. With this PR I'm moving away from `spanlogger` all places where the log message is too bad or nonexistent. That's because these cases are indicating we don't care about that log line at all, and only about the data injected in the span. --- pkg/logql/engine.go | 4 +-- pkg/logql/metrics.go | 1 - pkg/logql/metrics_test.go | 2 +- pkg/logqlmodel/stats/context.go | 29 ++++++++------- pkg/querier/http.go | 35 +++++++++++-------- pkg/querier/querier.go | 17 +++++---- pkg/querier/queryrange/downstreamer.go | 5 +-- pkg/querier/queryrange/limits.go | 6 +--- .../queryrange/queryrangebase/query_range.go | 6 ++-- pkg/querier/queryrange/querysharding.go | 13 +++---- pkg/querier/queryrange/shard_resolver.go | 4 +-- pkg/querier/queryrange/stats.go | 4 ++- pkg/storage/async_store.go | 7 ++-- pkg/storage/chunk/cache/resultscache/cache.go | 11 ++---- .../chunk/client/util/parallel_chunk_fetch.go | 11 +++--- pkg/storage/stores/composite_store_entry.go | 7 +--- .../stores/series/series_index_store.go | 33 +++++++---------- .../stores/shipper/bloomshipper/client.go | 13 +++---- .../indexshipper/storage/cached_client.go | 28 ++++++++------- 19 files changed, 112 insertions(+), 124 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index fcbfcb450e..8b46ed4d83 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -36,7 +36,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" logutil "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/server" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -231,7 +230,6 @@ func (q *query) resultLength(res promql_parser.Value) int { func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "query.Exec") defer sp.Finish() - spLogger := spanlogger.FromContext(ctx) sp.LogKV( "type", GetRangeType(q.params), @@ -265,7 +263,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data)) - statResult.Log(level.Debug(spLogger)) + sp.LogKV(statResult.KVList()...) status, _ := server.ClientHTTPStatusAndError(err) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 052446c6b5..6f35f84b3a 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -377,7 +377,6 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim "query", query, "query_hash", util.HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned) - level.Info(logger).Log(logValues...) execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 44094e27f5..577627a202 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -92,8 +92,8 @@ func TestLogSlowQuery(t *testing.T) { func TestLogLabelsQuery(t *testing.T) { buf := bytes.NewBufferString("") - logger := log.NewLogfmtLogger(buf) tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter()) + logger := log.NewLogfmtLogger(buf) defer c.Close() opentracing.SetGlobalTracer(tr) sp := opentracing.StartSpan("") diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 18794fb137..a0509be31f 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -26,6 +26,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/go-kit/log" ) @@ -518,9 +519,12 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache { return stats } -// Log logs a query statistics result. -func (r Result) Log(log log.Logger) { - _ = log.Log( +func (r Result) Log(logger log.Logger) { + logger.Log(r.KVList()...) +} + +func (r Result) KVList() []any { + result := []any{ "Ingester.TotalReached", r.Ingester.TotalReached, "Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched, "Ingester.TotalBatches", r.Ingester.TotalBatches, @@ -549,13 +553,14 @@ func (r Result) Log(log log.Logger) { "Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)), "Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates, "Querier.QueryReferencedStructuredMetadata", r.Querier.Store.QueryReferencedStructured, - ) - r.Caches.Log(log) - r.Summary.Log(log) + } + + result = append(result, r.Caches.kvList()...) + return append(result, r.Summary.kvList()...) } -func (s Summary) Log(log log.Logger) { - _ = log.Log( +func (s Summary) kvList() []any { + return []any{ "Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)), "Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond, "Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)), @@ -563,11 +568,11 @@ func (s Summary) Log(log log.Logger) { "Summary.PostFilterLines", s.TotalPostFilterLines, "Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime), "Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime), - ) + } } -func (c Caches) Log(log log.Logger) { - _ = log.Log( +func (c Caches) kvList() []any { + return []any{ "Cache.Chunk.Requests", c.Chunk.Requests, "Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested, "Cache.Chunk.EntriesFound", c.Chunk.EntriesFound, @@ -620,5 +625,5 @@ func (c Caches) Log(log log.Logger) { "Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)), "Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)), "Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(), - ) + } } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 021cbbfe7e..302c1c4281 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -112,15 +112,16 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques resLength = len(resp.Values) } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) + logql.RecordLabelQueryMetrics(ctx, util_log.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) return resp, err } @@ -266,15 +267,16 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) + logql.RecordSeriesQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) return resp, statResult, err } @@ -296,15 +298,16 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) + logql.RecordStatsQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) return resp, err } @@ -327,8 +330,9 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { @@ -336,7 +340,7 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ } logql.RecordShardsQueryMetrics( - ctx, log, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult, + ctx, util_log.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult, ) return resp, err @@ -363,15 +367,16 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) + logql.RecordVolumeQueryMetrics(ctx, util_log.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) return resp, nil } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 905124fb7c..0cd101cd6e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -161,6 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) + sp := opentracing.SpanFromContext(ctx) iters := []iter.EntryIterator{} if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { // Make a copy of the request before modifying @@ -171,9 +172,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec } newParams.Start = ingesterQueryInterval.start newParams.End = ingesterQueryInterval.end - level.Debug(spanlogger.FromContext(ctx)).Log( - "msg", "querying ingester", - "params", newParams) + if sp != nil { + sp.LogKV( + "msg", "querying ingester", + "params", newParams) + } ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams) if err != nil { return nil, err @@ -185,9 +188,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { params.Start = storeQueryInterval.start params.End = storeQueryInterval.end - level.Debug(spanlogger.FromContext(ctx)).Log( - "msg", "querying store", - "params", params) + if sp != nil { + sp.LogKV( + "msg", "querying store", + "params", params) + } storeIter, err := q.store.SelectLogs(ctx, params) if err != nil { return nil, err diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 9f946a3247..f4b4b18cac 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -6,7 +6,6 @@ import ( "reflect" "time" - "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" @@ -19,7 +18,6 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -144,8 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue } sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance") defer sp.Finish() - logger := spanlogger.FromContext(ctx) - level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream") + sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream") res, err := in.handler.Do(ctx, req) if err != nil { diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 68f71680dd..695c0d5346 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -277,8 +277,6 @@ func NewQuerySizeLimiterMiddleware( func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "querySizeLimiter.getBytesReadForRequest") defer sp.Finish() - log := spanlogger.FromContextWithFallback(ctx, q.logger) - defer log.Finish() expr, err := syntax.ParseExpr(r.GetQuery()) if err != nil { @@ -300,7 +298,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra combinedStats := stats.MergeStats(matcherStats...) - level.Debug(log).Log( + level.Debug(q.logger).Log( append( combinedStats.LoggingKeyValues(), "msg", "queried index", @@ -371,8 +369,6 @@ func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (qu level.Warn(log).Log("msg", "Query exceeds limits", "status", "rejected", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr) return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr) } - - level.Debug(log).Log("msg", "Query is within limits", "status", "accepted", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr) } return q.next.Do(ctx, r) diff --git a/pkg/querier/queryrange/queryrangebase/query_range.go b/pkg/querier/queryrange/queryrangebase/query_range.go index 44ac64f021..bb85f1a191 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range.go +++ b/pkg/querier/queryrange/queryrangebase/query_range.go @@ -17,13 +17,13 @@ import ( "github.com/grafana/dskit/httpgrpc" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) // StatusSuccess Prometheus success result. @@ -208,15 +208,13 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R } sp, ctx := opentracing.StartSpanFromContext(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() buf, err := bodyBuffer(r) if err != nil { log.Error(err) return nil, err } - log.LogFields(otlog.Int("bytes", len(buf))) + sp.LogKV(otlog.Int("bytes", len(buf))) var resp PrometheusResponse if err := json.Unmarshal(buf, &resp); err != nil { diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 9005271cc9..07da7abfb6 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -146,9 +146,10 @@ func (ast *astMapperware) checkQuerySizeLimit(ctx context.Context, bytesPerShard } func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - logger := spanlogger.FromContextWithFallback( + logger := util_log.WithContext(ctx, ast.logger) + spLogger := spanlogger.FromContextWithFallback( ctx, - util_log.WithContext(ctx, ast.logger), + logger, ) params, err := ParamsFromRequest(r) @@ -158,14 +159,14 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(params.GetExpression()) if err != nil { - level.Warn(logger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request") + level.Warn(spLogger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request") return ast.next.Do(ctx, r) } conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset))) // cannot shard with this timerange if err != nil { - level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request") + level.Warn(spLogger).Log("err", err.Error(), "msg", "skipped AST mapper for request") return ast.next.Do(ctx, r) } @@ -200,7 +201,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que v := ast.limits.TSDBShardingStrategy(tenants[0]) version, err := logql.ParseShardVersion(v) if err != nil { - level.Warn(logger).Log( + level.Warn(spLogger).Log( "msg", "failed to parse shard version", "fallback", version.String(), "err", err.Error(), @@ -214,7 +215,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que noop, bytesPerShard, parsed, err := mapper.Parse(params.GetExpression()) if err != nil { - level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery()) + level.Warn(spLogger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery()) return nil, err } level.Debug(logger).Log("no-op", noop, "mapped", parsed.String()) diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index ab4d23e830..33438c3717 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/v3/pkg/storage/types" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -141,8 +142,6 @@ func getStatsForMatchers( func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) { sp, ctx := opentracing.StartSpanFromContext(r.ctx, "dynamicShardResolver.GetStats") defer sp.Finish() - log := spanlogger.FromContext(r.ctx) - defer log.Finish() start := time.Now() @@ -159,6 +158,7 @@ func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) { grps = append(grps, syntax.MatcherRange{}) } + log := util_log.WithContext(ctx, util_log.Logger) results, err := getStatsForMatchers(ctx, log, r.statsHandler, r.from, r.through, grps, r.maxParallelism, r.defaultLookback) if err != nil { return stats.Stats{}, err diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 67ca803d52..f890242d15 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -195,7 +195,9 @@ func StatsCollectorMiddleware() queryrangebase.Middleware { // Re-calculate the summary: the queueTime result is already merged so should not be updated // Log and record metrics for the current query responseStats.ComputeSummary(time.Since(start), 0, totalEntries) - responseStats.Log(level.Debug(logger)) + if logger.Span != nil { + logger.Span.LogKV(responseStats.KVList()...) + } } ctxValue := ctx.Value(ctxKey) if data, ok := ctxValue.(*queryData); ok { diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index ed3c9dab6b..49fe26612e 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -25,7 +25,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) type IngesterQuerier interface { @@ -73,8 +72,6 @@ func (a *AsyncStore) GetChunks(ctx context.Context, predicate chunk.Predicate, storeChunksOverride *logproto.ChunkRefGroup, ) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { - spanLogger := spanlogger.FromContext(ctx) - errs := make(chan error) var storeChunks [][]chunk.Chunk @@ -98,7 +95,9 @@ func (a *AsyncStore) GetChunks(ctx context.Context, ingesterChunks, err = a.ingesterQuerier.GetChunkIDs(ctx, from, through, predicate.Matchers...) if err == nil { - level.Debug(spanLogger).Log("ingester-chunks-count", len(ingesterChunks)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("ingester-chunks-count", len(ingesterChunks)) + } level.Debug(util_log.Logger).Log("msg", "got chunk ids from ingester", "count", len(ingesterChunks)) } errs <- err diff --git a/pkg/storage/chunk/cache/resultscache/cache.go b/pkg/storage/chunk/cache/resultscache/cache.go index d6e153cf69..aaf1d47fa8 100644 --- a/pkg/storage/chunk/cache/resultscache/cache.go +++ b/pkg/storage/chunk/cache/resultscache/cache.go @@ -21,8 +21,8 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/math" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -183,8 +183,6 @@ func (s ResultsCache) handleHit(ctx context.Context, r Request, extents []Extent ) sp, ctx := opentracing.StartSpanFromContext(ctx, "handleHit") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() requests, responses, err := s.partition(r, extents) if err != nil { @@ -426,14 +424,11 @@ func (s ResultsCache) get(ctx context.Context, key string) ([]Extent, bool) { var resp CachedResponse sp, ctx := opentracing.StartSpanFromContext(ctx, "unmarshal-extent") //nolint:ineffassign,staticcheck defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() - log.LogFields(otlog.Int("bytes", len(bufs[0]))) + sp.LogFields(otlog.Int("bytes", len(bufs[0]))) if err := proto.Unmarshal(bufs[0], &resp); err != nil { - level.Error(log).Log("msg", "error unmarshalling cached value", "err", err) - log.Error(err) + level.Error(util_log.Logger).Log("msg", "error unmarshalling cached value", "err", err) return nil, false } diff --git a/pkg/storage/chunk/client/util/parallel_chunk_fetch.go b/pkg/storage/chunk/client/util/parallel_chunk_fetch.go index c61fdcf1bd..40a9af1ddf 100644 --- a/pkg/storage/chunk/client/util/parallel_chunk_fetch.go +++ b/pkg/storage/chunk/client/util/parallel_chunk_fetch.go @@ -4,11 +4,12 @@ import ( "context" "sync" + "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/grafana/loki/v3/pkg/storage/chunk" - "github.com/grafana/loki/v3/pkg/util/spanlogger" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) var decodeContextPool = sync.Pool{ @@ -21,9 +22,7 @@ var decodeContextPool = sync.Pool{ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "GetParallelChunks") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() - log.LogFields(otlog.Int("requested", len(chunks))) + sp.LogFields(otlog.Int("requested", len(chunks))) if ctx.Err() != nil { return nil, ctx.Err() @@ -67,9 +66,9 @@ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chun } } - log.LogFields(otlog.Int("fetched", len(result))) + sp.LogFields(otlog.Int("fetched", len(result))) if lastErr != nil { - log.Error(lastErr) + level.Error(util_log.Logger).Log("msg", "error fetching chunks", "err", lastErr) } // Return any chunks we did receive: a partial result may be useful diff --git a/pkg/storage/stores/composite_store_entry.go b/pkg/storage/stores/composite_store_entry.go index 5b8db237f4..2a376e4525 100644 --- a/pkg/storage/stores/composite_store_entry.go +++ b/pkg/storage/stores/composite_store_entry.go @@ -21,7 +21,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -114,8 +113,6 @@ func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) if err != nil { @@ -123,7 +120,7 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, } else if shortcut { return nil, nil } - level.Debug(log).Log("metric", metricName) + sp.LogKV("metric", metricName) return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...) } @@ -131,8 +128,6 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) if err != nil { diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index c3ef58266e..9fb64fe9b8 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -32,7 +32,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/extract" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) var ( @@ -319,15 +318,13 @@ func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() // Fetch the series IDs from the index seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers) if err != nil { return nil, err } - level.Debug(log).Log("series-ids", len(seriesIDs)) + sp.LogKV("series-ids", len(seriesIDs)) // Lookup the series in the index to get label names. labelNames, err := c.lookupLabelNamesBySeries(ctx, from, through, userID, seriesIDs) @@ -336,10 +333,10 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID if err == series_index.ErrNotSupported { return c.lookupLabelNamesByChunks(ctx, from, through, userID, seriesIDs) } - level.Error(log).Log("msg", "lookupLabelNamesBySeries", "err", err) + sp.LogKV("msg", "lookupLabelNamesBySeries", "err", err) return nil, err } - level.Debug(log).Log("labelNames", len(labelNames)) + sp.LogKV("labelNames", len(labelNames)) return labelNames, nil } @@ -347,14 +344,12 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID func (c *IndexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() if len(matchers) != 0 { return c.labelValuesForMetricNameWithMatchers(ctx, userID, from, through, metricName, labelName, matchers...) } - level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "labelName", labelName) + sp.LogKV("from", from, "through", through, "metricName", metricName, "labelName", labelName) queries, err := c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) if err != nil { @@ -634,10 +629,8 @@ func (c *IndexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesBySeries") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() - level.Debug(log).Log("seriesIDs", len(seriesIDs)) + sp.LogKV("seriesIDs", len(seriesIDs)) queries := make([]series_index.Query, 0, len(seriesIDs)) for _, seriesID := range seriesIDs { qs, err := c.schema.GetLabelNamesForSeries(from, through, userID, []byte(seriesID)) @@ -646,7 +639,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, } queries = append(queries, qs...) } - level.Debug(log).Log("queries", len(queries)) + sp.LogKV("queries", len(queries)) entries := entriesPool.Get().(*[]series_index.Entry) defer entriesPool.Put(entries) err := c.lookupEntriesByQueries(ctx, queries, entries) @@ -654,7 +647,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, return nil, err } - level.Debug(log).Log("entries", len(*entries)) + sp.LogKV("entries", len(*entries)) var result util.UniqueStrings for _, entry := range *entries { @@ -671,34 +664,32 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, func (c *IndexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesByChunks") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() // Lookup the series in the index to get the chunks. chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs) if err != nil { - level.Error(log).Log("msg", "lookupChunksBySeries", "err", err) + sp.LogKV("msg", "lookupChunksBySeries", "err", err) return nil, err } - level.Debug(log).Log("chunk-ids", len(chunkIDs)) + sp.LogKV("chunk-ids", len(chunkIDs)) chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs) if err != nil { - level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err) + sp.LogKV("err", "convertChunkIDsToChunks", "err", err) return nil, err } // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint filtered := filterChunksByTime(from, through, chunks) filtered = filterChunksByUniqueFingerprint(filtered) - level.Debug(log).Log("Chunks post filtering", len(chunks)) + sp.LogKV("Chunks post filtering", len(chunks)) chunksPerQuery.Observe(float64(len(filtered))) // Now fetch the actual chunk data from Memcache / S3 allChunks, err := c.fetcher.FetchChunks(ctx, filtered) if err != nil { - level.Error(log).Log("msg", "FetchChunks", "err", err) + sp.LogKV("msg", "FetchChunks", "err", err) return nil, err } return labelNamesFromChunks(allChunks), nil diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index ce70ce172c..f6da2168ae 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -23,7 +24,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/util/encoding" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -494,15 +494,16 @@ func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Dura func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { var ( - logger = spanlogger.FromContext(ctx) start = time.Now() cacheDur time.Duration ) defer func() { - logger.LogKV( - "cache_duration", cacheDur, - "total_duration", time.Since(start), - ) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV( + "cache_duration", cacheDur, + "total_duration", time.Since(start), + ) + } }() if delimiter != "" { diff --git a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go index c7d909bc09..2aa3cfda87 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go @@ -9,11 +9,11 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/opentracing/opentracing-go" "golang.org/x/sync/singleflight" "github.com/grafana/loki/v3/pkg/storage/chunk/client" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -190,12 +190,13 @@ func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err erro } }() - logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building table names cache") - now := time.Now() - defer func() { - level.Info(logger).Log("msg", "table names cache built", "duration", time.Since(now)) - }() + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("msg", "building table names cache") + now := time.Now() + defer func() { + sp.LogKV("msg", "table names cache built", "duration", time.Since(now)) + }() + } _, tableNames, err := c.ObjectClient.List(ctx, "", delimiter) if err != nil { @@ -276,12 +277,13 @@ func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient } }() - logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building table cache") - now := time.Now() - defer func() { - level.Info(logger).Log("msg", "table cache built", "duration", time.Since(now)) - }() + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("msg", "building table cache") + now := time.Now() + defer func() { + sp.LogKV("msg", "table cache built", "duration", time.Since(now)) + }() + } objects, _, err := objectClient.List(ctx, t.name+delimiter, "") if err != nil {