diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index fcbfcb450e..8b46ed4d83 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -36,7 +36,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" logutil "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/server" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -231,7 +230,6 @@ func (q *query) resultLength(res promql_parser.Value) int { func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "query.Exec") defer sp.Finish() - spLogger := spanlogger.FromContext(ctx) sp.LogKV( "type", GetRangeType(q.params), @@ -265,7 +263,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data)) - statResult.Log(level.Debug(spLogger)) + sp.LogKV(statResult.KVList()...) status, _ := server.ClientHTTPStatusAndError(err) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 052446c6b5..6f35f84b3a 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -377,7 +377,6 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim "query", query, "query_hash", util.HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned) - level.Info(logger).Log(logValues...) execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 44094e27f5..577627a202 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -92,8 +92,8 @@ func TestLogSlowQuery(t *testing.T) { func TestLogLabelsQuery(t *testing.T) { buf := bytes.NewBufferString("") - logger := log.NewLogfmtLogger(buf) tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter()) + logger := log.NewLogfmtLogger(buf) defer c.Close() opentracing.SetGlobalTracer(tr) sp := opentracing.StartSpan("") diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 18794fb137..a0509be31f 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -26,6 +26,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/go-kit/log" ) @@ -518,9 +519,12 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache { return stats } -// Log logs a query statistics result. -func (r Result) Log(log log.Logger) { - _ = log.Log( +func (r Result) Log(logger log.Logger) { + logger.Log(r.KVList()...) +} + +func (r Result) KVList() []any { + result := []any{ "Ingester.TotalReached", r.Ingester.TotalReached, "Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched, "Ingester.TotalBatches", r.Ingester.TotalBatches, @@ -549,13 +553,14 @@ func (r Result) Log(log log.Logger) { "Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)), "Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates, "Querier.QueryReferencedStructuredMetadata", r.Querier.Store.QueryReferencedStructured, - ) - r.Caches.Log(log) - r.Summary.Log(log) + } + + result = append(result, r.Caches.kvList()...) + return append(result, r.Summary.kvList()...) } -func (s Summary) Log(log log.Logger) { - _ = log.Log( +func (s Summary) kvList() []any { + return []any{ "Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)), "Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond, "Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)), @@ -563,11 +568,11 @@ func (s Summary) Log(log log.Logger) { "Summary.PostFilterLines", s.TotalPostFilterLines, "Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime), "Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime), - ) + } } -func (c Caches) Log(log log.Logger) { - _ = log.Log( +func (c Caches) kvList() []any { + return []any{ "Cache.Chunk.Requests", c.Chunk.Requests, "Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested, "Cache.Chunk.EntriesFound", c.Chunk.EntriesFound, @@ -620,5 +625,5 @@ func (c Caches) Log(log log.Logger) { "Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)), "Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)), "Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(), - ) + } } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 021cbbfe7e..302c1c4281 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -112,15 +112,16 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques resLength = len(resp.Values) } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) + logql.RecordLabelQueryMetrics(ctx, util_log.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) return resp, err } @@ -266,15 +267,16 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) + logql.RecordSeriesQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) return resp, statResult, err } @@ -296,15 +298,16 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) + logql.RecordStatsQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) return resp, err } @@ -327,8 +330,9 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { @@ -336,7 +340,7 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ } logql.RecordShardsQueryMetrics( - ctx, log, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult, + ctx, util_log.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult, ) return resp, err @@ -363,15 +367,16 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) + logql.RecordVolumeQueryMetrics(ctx, util_log.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) return resp, nil } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 905124fb7c..0cd101cd6e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -161,6 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) + sp := opentracing.SpanFromContext(ctx) iters := []iter.EntryIterator{} if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { // Make a copy of the request before modifying @@ -171,9 +172,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec } newParams.Start = ingesterQueryInterval.start newParams.End = ingesterQueryInterval.end - level.Debug(spanlogger.FromContext(ctx)).Log( - "msg", "querying ingester", - "params", newParams) + if sp != nil { + sp.LogKV( + "msg", "querying ingester", + "params", newParams) + } ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams) if err != nil { return nil, err @@ -185,9 +188,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { params.Start = storeQueryInterval.start params.End = storeQueryInterval.end - level.Debug(spanlogger.FromContext(ctx)).Log( - "msg", "querying store", - "params", params) + if sp != nil { + sp.LogKV( + "msg", "querying store", + "params", params) + } storeIter, err := q.store.SelectLogs(ctx, params) if err != nil { return nil, err diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 9f946a3247..f4b4b18cac 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -6,7 +6,6 @@ import ( "reflect" "time" - "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" @@ -19,7 +18,6 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -144,8 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue } sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance") defer sp.Finish() - logger := spanlogger.FromContext(ctx) - level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream") + sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream") res, err := in.handler.Do(ctx, req) if err != nil { diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 68f71680dd..695c0d5346 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -277,8 +277,6 @@ func NewQuerySizeLimiterMiddleware( func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "querySizeLimiter.getBytesReadForRequest") defer sp.Finish() - log := spanlogger.FromContextWithFallback(ctx, q.logger) - defer log.Finish() expr, err := syntax.ParseExpr(r.GetQuery()) if err != nil { @@ -300,7 +298,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra combinedStats := stats.MergeStats(matcherStats...) - level.Debug(log).Log( + level.Debug(q.logger).Log( append( combinedStats.LoggingKeyValues(), "msg", "queried index", @@ -371,8 +369,6 @@ func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (qu level.Warn(log).Log("msg", "Query exceeds limits", "status", "rejected", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr) return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr) } - - level.Debug(log).Log("msg", "Query is within limits", "status", "accepted", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr) } return q.next.Do(ctx, r) diff --git a/pkg/querier/queryrange/queryrangebase/query_range.go b/pkg/querier/queryrange/queryrangebase/query_range.go index 44ac64f021..bb85f1a191 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range.go +++ b/pkg/querier/queryrange/queryrangebase/query_range.go @@ -17,13 +17,13 @@ import ( "github.com/grafana/dskit/httpgrpc" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) // StatusSuccess Prometheus success result. @@ -208,15 +208,13 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R } sp, ctx := opentracing.StartSpanFromContext(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() buf, err := bodyBuffer(r) if err != nil { log.Error(err) return nil, err } - log.LogFields(otlog.Int("bytes", len(buf))) + sp.LogKV(otlog.Int("bytes", len(buf))) var resp PrometheusResponse if err := json.Unmarshal(buf, &resp); err != nil { diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 9005271cc9..07da7abfb6 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -146,9 +146,10 @@ func (ast *astMapperware) checkQuerySizeLimit(ctx context.Context, bytesPerShard } func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - logger := spanlogger.FromContextWithFallback( + logger := util_log.WithContext(ctx, ast.logger) + spLogger := spanlogger.FromContextWithFallback( ctx, - util_log.WithContext(ctx, ast.logger), + logger, ) params, err := ParamsFromRequest(r) @@ -158,14 +159,14 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(params.GetExpression()) if err != nil { - level.Warn(logger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request") + level.Warn(spLogger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request") return ast.next.Do(ctx, r) } conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset))) // cannot shard with this timerange if err != nil { - level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request") + level.Warn(spLogger).Log("err", err.Error(), "msg", "skipped AST mapper for request") return ast.next.Do(ctx, r) } @@ -200,7 +201,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que v := ast.limits.TSDBShardingStrategy(tenants[0]) version, err := logql.ParseShardVersion(v) if err != nil { - level.Warn(logger).Log( + level.Warn(spLogger).Log( "msg", "failed to parse shard version", "fallback", version.String(), "err", err.Error(), @@ -214,7 +215,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que noop, bytesPerShard, parsed, err := mapper.Parse(params.GetExpression()) if err != nil { - level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery()) + level.Warn(spLogger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery()) return nil, err } level.Debug(logger).Log("no-op", noop, "mapped", parsed.String()) diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index ab4d23e830..33438c3717 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/v3/pkg/storage/types" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -141,8 +142,6 @@ func getStatsForMatchers( func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) { sp, ctx := opentracing.StartSpanFromContext(r.ctx, "dynamicShardResolver.GetStats") defer sp.Finish() - log := spanlogger.FromContext(r.ctx) - defer log.Finish() start := time.Now() @@ -159,6 +158,7 @@ func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) { grps = append(grps, syntax.MatcherRange{}) } + log := util_log.WithContext(ctx, util_log.Logger) results, err := getStatsForMatchers(ctx, log, r.statsHandler, r.from, r.through, grps, r.maxParallelism, r.defaultLookback) if err != nil { return stats.Stats{}, err diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 67ca803d52..f890242d15 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -195,7 +195,9 @@ func StatsCollectorMiddleware() queryrangebase.Middleware { // Re-calculate the summary: the queueTime result is already merged so should not be updated // Log and record metrics for the current query responseStats.ComputeSummary(time.Since(start), 0, totalEntries) - responseStats.Log(level.Debug(logger)) + if logger.Span != nil { + logger.Span.LogKV(responseStats.KVList()...) + } } ctxValue := ctx.Value(ctxKey) if data, ok := ctxValue.(*queryData); ok { diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index ed3c9dab6b..49fe26612e 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -25,7 +25,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) type IngesterQuerier interface { @@ -73,8 +72,6 @@ func (a *AsyncStore) GetChunks(ctx context.Context, predicate chunk.Predicate, storeChunksOverride *logproto.ChunkRefGroup, ) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { - spanLogger := spanlogger.FromContext(ctx) - errs := make(chan error) var storeChunks [][]chunk.Chunk @@ -98,7 +95,9 @@ func (a *AsyncStore) GetChunks(ctx context.Context, ingesterChunks, err = a.ingesterQuerier.GetChunkIDs(ctx, from, through, predicate.Matchers...) if err == nil { - level.Debug(spanLogger).Log("ingester-chunks-count", len(ingesterChunks)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("ingester-chunks-count", len(ingesterChunks)) + } level.Debug(util_log.Logger).Log("msg", "got chunk ids from ingester", "count", len(ingesterChunks)) } errs <- err diff --git a/pkg/storage/chunk/cache/resultscache/cache.go b/pkg/storage/chunk/cache/resultscache/cache.go index d6e153cf69..aaf1d47fa8 100644 --- a/pkg/storage/chunk/cache/resultscache/cache.go +++ b/pkg/storage/chunk/cache/resultscache/cache.go @@ -21,8 +21,8 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/math" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -183,8 +183,6 @@ func (s ResultsCache) handleHit(ctx context.Context, r Request, extents []Extent ) sp, ctx := opentracing.StartSpanFromContext(ctx, "handleHit") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() requests, responses, err := s.partition(r, extents) if err != nil { @@ -426,14 +424,11 @@ func (s ResultsCache) get(ctx context.Context, key string) ([]Extent, bool) { var resp CachedResponse sp, ctx := opentracing.StartSpanFromContext(ctx, "unmarshal-extent") //nolint:ineffassign,staticcheck defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() - log.LogFields(otlog.Int("bytes", len(bufs[0]))) + sp.LogFields(otlog.Int("bytes", len(bufs[0]))) if err := proto.Unmarshal(bufs[0], &resp); err != nil { - level.Error(log).Log("msg", "error unmarshalling cached value", "err", err) - log.Error(err) + level.Error(util_log.Logger).Log("msg", "error unmarshalling cached value", "err", err) return nil, false } diff --git a/pkg/storage/chunk/client/util/parallel_chunk_fetch.go b/pkg/storage/chunk/client/util/parallel_chunk_fetch.go index c61fdcf1bd..40a9af1ddf 100644 --- a/pkg/storage/chunk/client/util/parallel_chunk_fetch.go +++ b/pkg/storage/chunk/client/util/parallel_chunk_fetch.go @@ -4,11 +4,12 @@ import ( "context" "sync" + "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/grafana/loki/v3/pkg/storage/chunk" - "github.com/grafana/loki/v3/pkg/util/spanlogger" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) var decodeContextPool = sync.Pool{ @@ -21,9 +22,7 @@ var decodeContextPool = sync.Pool{ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "GetParallelChunks") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() - log.LogFields(otlog.Int("requested", len(chunks))) + sp.LogFields(otlog.Int("requested", len(chunks))) if ctx.Err() != nil { return nil, ctx.Err() @@ -67,9 +66,9 @@ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chun } } - log.LogFields(otlog.Int("fetched", len(result))) + sp.LogFields(otlog.Int("fetched", len(result))) if lastErr != nil { - log.Error(lastErr) + level.Error(util_log.Logger).Log("msg", "error fetching chunks", "err", lastErr) } // Return any chunks we did receive: a partial result may be useful diff --git a/pkg/storage/stores/composite_store_entry.go b/pkg/storage/stores/composite_store_entry.go index 5b8db237f4..2a376e4525 100644 --- a/pkg/storage/stores/composite_store_entry.go +++ b/pkg/storage/stores/composite_store_entry.go @@ -21,7 +21,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -114,8 +113,6 @@ func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) if err != nil { @@ -123,7 +120,7 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, } else if shortcut { return nil, nil } - level.Debug(log).Log("metric", metricName) + sp.LogKV("metric", metricName) return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...) } @@ -131,8 +128,6 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) if err != nil { diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index c3ef58266e..9fb64fe9b8 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -32,7 +32,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/extract" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) var ( @@ -319,15 +318,13 @@ func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() // Fetch the series IDs from the index seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers) if err != nil { return nil, err } - level.Debug(log).Log("series-ids", len(seriesIDs)) + sp.LogKV("series-ids", len(seriesIDs)) // Lookup the series in the index to get label names. labelNames, err := c.lookupLabelNamesBySeries(ctx, from, through, userID, seriesIDs) @@ -336,10 +333,10 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID if err == series_index.ErrNotSupported { return c.lookupLabelNamesByChunks(ctx, from, through, userID, seriesIDs) } - level.Error(log).Log("msg", "lookupLabelNamesBySeries", "err", err) + sp.LogKV("msg", "lookupLabelNamesBySeries", "err", err) return nil, err } - level.Debug(log).Log("labelNames", len(labelNames)) + sp.LogKV("labelNames", len(labelNames)) return labelNames, nil } @@ -347,14 +344,12 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID func (c *IndexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() if len(matchers) != 0 { return c.labelValuesForMetricNameWithMatchers(ctx, userID, from, through, metricName, labelName, matchers...) } - level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "labelName", labelName) + sp.LogKV("from", from, "through", through, "metricName", metricName, "labelName", labelName) queries, err := c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) if err != nil { @@ -634,10 +629,8 @@ func (c *IndexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesBySeries") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() - level.Debug(log).Log("seriesIDs", len(seriesIDs)) + sp.LogKV("seriesIDs", len(seriesIDs)) queries := make([]series_index.Query, 0, len(seriesIDs)) for _, seriesID := range seriesIDs { qs, err := c.schema.GetLabelNamesForSeries(from, through, userID, []byte(seriesID)) @@ -646,7 +639,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, } queries = append(queries, qs...) } - level.Debug(log).Log("queries", len(queries)) + sp.LogKV("queries", len(queries)) entries := entriesPool.Get().(*[]series_index.Entry) defer entriesPool.Put(entries) err := c.lookupEntriesByQueries(ctx, queries, entries) @@ -654,7 +647,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, return nil, err } - level.Debug(log).Log("entries", len(*entries)) + sp.LogKV("entries", len(*entries)) var result util.UniqueStrings for _, entry := range *entries { @@ -671,34 +664,32 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, func (c *IndexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesByChunks") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() // Lookup the series in the index to get the chunks. chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs) if err != nil { - level.Error(log).Log("msg", "lookupChunksBySeries", "err", err) + sp.LogKV("msg", "lookupChunksBySeries", "err", err) return nil, err } - level.Debug(log).Log("chunk-ids", len(chunkIDs)) + sp.LogKV("chunk-ids", len(chunkIDs)) chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs) if err != nil { - level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err) + sp.LogKV("err", "convertChunkIDsToChunks", "err", err) return nil, err } // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint filtered := filterChunksByTime(from, through, chunks) filtered = filterChunksByUniqueFingerprint(filtered) - level.Debug(log).Log("Chunks post filtering", len(chunks)) + sp.LogKV("Chunks post filtering", len(chunks)) chunksPerQuery.Observe(float64(len(filtered))) // Now fetch the actual chunk data from Memcache / S3 allChunks, err := c.fetcher.FetchChunks(ctx, filtered) if err != nil { - level.Error(log).Log("msg", "FetchChunks", "err", err) + sp.LogKV("msg", "FetchChunks", "err", err) return nil, err } return labelNamesFromChunks(allChunks), nil diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index ce70ce172c..f6da2168ae 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -23,7 +24,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/util/encoding" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -494,15 +494,16 @@ func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Dura func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { var ( - logger = spanlogger.FromContext(ctx) start = time.Now() cacheDur time.Duration ) defer func() { - logger.LogKV( - "cache_duration", cacheDur, - "total_duration", time.Since(start), - ) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV( + "cache_duration", cacheDur, + "total_duration", time.Since(start), + ) + } }() if delimiter != "" { diff --git a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go index c7d909bc09..2aa3cfda87 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go @@ -9,11 +9,11 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/opentracing/opentracing-go" "golang.org/x/sync/singleflight" "github.com/grafana/loki/v3/pkg/storage/chunk/client" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -190,12 +190,13 @@ func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err erro } }() - logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building table names cache") - now := time.Now() - defer func() { - level.Info(logger).Log("msg", "table names cache built", "duration", time.Since(now)) - }() + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("msg", "building table names cache") + now := time.Now() + defer func() { + sp.LogKV("msg", "table names cache built", "duration", time.Since(now)) + }() + } _, tableNames, err := c.ObjectClient.List(ctx, "", delimiter) if err != nil { @@ -276,12 +277,13 @@ func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient } }() - logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building table cache") - now := time.Now() - defer func() { - level.Info(logger).Log("msg", "table cache built", "duration", time.Since(now)) - }() + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("msg", "building table cache") + now := time.Now() + defer func() { + sp.LogKV("msg", "table cache built", "duration", time.Since(now)) + }() + } objects, _, err := objectClient.List(ctx, t.name+delimiter, "") if err != nil {