refactor: Remove unnecessary spanlogger usage (#13255)

Historically, we've been using `spanlogger` to enrich our spans. That isn't the right usage for it (as of now), as they'll be emitting log lines. Our current usage is causing:
- Noisy/important log lines less visible
- Unnecessary logging volume
- Spread of the bad practice.
With this PR I'm moving away from `spanlogger` all places where the log message is too bad or nonexistent. That's because these cases are indicating we don't care about that log line at all, and only about the data injected in the span.
pull/13285/head
Dylan Guedes 2 years ago committed by GitHub
parent f06eabbf0e
commit c1fada9af0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      pkg/logql/engine.go
  2. 1
      pkg/logql/metrics.go
  3. 2
      pkg/logql/metrics_test.go
  4. 29
      pkg/logqlmodel/stats/context.go
  5. 35
      pkg/querier/http.go
  6. 17
      pkg/querier/querier.go
  7. 5
      pkg/querier/queryrange/downstreamer.go
  8. 6
      pkg/querier/queryrange/limits.go
  9. 6
      pkg/querier/queryrange/queryrangebase/query_range.go
  10. 13
      pkg/querier/queryrange/querysharding.go
  11. 4
      pkg/querier/queryrange/shard_resolver.go
  12. 4
      pkg/querier/queryrange/stats.go
  13. 7
      pkg/storage/async_store.go
  14. 11
      pkg/storage/chunk/cache/resultscache/cache.go
  15. 11
      pkg/storage/chunk/client/util/parallel_chunk_fetch.go
  16. 7
      pkg/storage/stores/composite_store_entry.go
  17. 33
      pkg/storage/stores/series/series_index_store.go
  18. 13
      pkg/storage/stores/shipper/bloomshipper/client.go
  19. 28
      pkg/storage/stores/shipper/indexshipper/storage/cached_client.go

@ -36,7 +36,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
logutil "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)
@ -231,7 +230,6 @@ func (q *query) resultLength(res promql_parser.Value) int {
func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "query.Exec")
defer sp.Finish()
spLogger := spanlogger.FromContext(ctx)
sp.LogKV(
"type", GetRangeType(q.params),
@ -265,7 +263,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data))
statResult.Log(level.Debug(spLogger))
sp.LogKV(statResult.KVList()...)
status, _ := server.ClientHTTPStatusAndError(err)

@ -377,7 +377,6 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"query", query,
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)
level.Info(logger).Log(logValues...)
execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)

@ -92,8 +92,8 @@ func TestLogSlowQuery(t *testing.T) {
func TestLogLabelsQuery(t *testing.T) {
buf := bytes.NewBufferString("")
logger := log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
logger := log.NewLogfmtLogger(buf)
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")

@ -26,6 +26,7 @@ import (
"time"
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
)
@ -518,9 +519,12 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache {
return stats
}
// Log logs a query statistics result.
func (r Result) Log(log log.Logger) {
_ = log.Log(
func (r Result) Log(logger log.Logger) {
logger.Log(r.KVList()...)
}
func (r Result) KVList() []any {
result := []any{
"Ingester.TotalReached", r.Ingester.TotalReached,
"Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched,
"Ingester.TotalBatches", r.Ingester.TotalBatches,
@ -549,13 +553,14 @@ func (r Result) Log(log log.Logger) {
"Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)),
"Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates,
"Querier.QueryReferencedStructuredMetadata", r.Querier.Store.QueryReferencedStructured,
)
r.Caches.Log(log)
r.Summary.Log(log)
}
result = append(result, r.Caches.kvList()...)
return append(result, r.Summary.kvList()...)
}
func (s Summary) Log(log log.Logger) {
_ = log.Log(
func (s Summary) kvList() []any {
return []any{
"Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)),
"Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond,
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)),
@ -563,11 +568,11 @@ func (s Summary) Log(log log.Logger) {
"Summary.PostFilterLines", s.TotalPostFilterLines,
"Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime),
"Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime),
)
}
}
func (c Caches) Log(log log.Logger) {
_ = log.Log(
func (c Caches) kvList() []any {
return []any{
"Cache.Chunk.Requests", c.Chunk.Requests,
"Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested,
"Cache.Chunk.EntriesFound", c.Chunk.EntriesFound,
@ -620,5 +625,5 @@ func (c Caches) Log(log log.Logger) {
"Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)),
"Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)),
"Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(),
)
}
}

@ -112,15 +112,16 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques
resLength = len(resp.Values)
}
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}
status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}
logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)
logql.RecordLabelQueryMetrics(ctx, util_log.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)
return resp, err
}
@ -266,15 +267,16 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ
}
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}
status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}
logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)
logql.RecordSeriesQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)
return resp, statResult, err
}
@ -296,15 +298,16 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}
status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}
logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)
logql.RecordStatsQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)
return resp, err
}
@ -327,8 +330,9 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}
status := 200
if err != nil {
@ -336,7 +340,7 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ
}
logql.RecordShardsQueryMetrics(
ctx, log, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
ctx, util_log.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
)
return resp, err
@ -363,15 +367,16 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}
status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}
logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)
logql.RecordVolumeQueryMetrics(ctx, util_log.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)
return resp, nil
}

@ -161,6 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)
sp := opentracing.SpanFromContext(ctx)
iters := []iter.EntryIterator{}
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
@ -171,9 +172,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
}
newParams.Start = ingesterQueryInterval.start
newParams.End = ingesterQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying ingester",
"params", newParams)
if sp != nil {
sp.LogKV(
"msg", "querying ingester",
"params", newParams)
}
ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams)
if err != nil {
return nil, err
@ -185,9 +188,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
params.Start = storeQueryInterval.start
params.End = storeQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying store",
"params", params)
if sp != nil {
sp.LogKV(
"msg", "querying store",
"params", params)
}
storeIter, err := q.store.SelectLogs(ctx, params)
if err != nil {
return nil, err

@ -6,7 +6,6 @@ import (
"reflect"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
@ -19,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)
const (
@ -144,8 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
logger := spanlogger.FromContext(ctx)
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")
sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")
res, err := in.handler.Do(ctx, req)
if err != nil {

@ -277,8 +277,6 @@ func NewQuerySizeLimiterMiddleware(
func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "querySizeLimiter.getBytesReadForRequest")
defer sp.Finish()
log := spanlogger.FromContextWithFallback(ctx, q.logger)
defer log.Finish()
expr, err := syntax.ParseExpr(r.GetQuery())
if err != nil {
@ -300,7 +298,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra
combinedStats := stats.MergeStats(matcherStats...)
level.Debug(log).Log(
level.Debug(q.logger).Log(
append(
combinedStats.LoggingKeyValues(),
"msg", "queried index",
@ -371,8 +369,6 @@ func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (qu
level.Warn(log).Log("msg", "Query exceeds limits", "status", "rejected", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr)
}
level.Debug(log).Log("msg", "Query is within limits", "status", "accepted", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
}
return q.next.Do(ctx, r)

@ -17,13 +17,13 @@ import (
"github.com/grafana/dskit/httpgrpc"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)
// StatusSuccess Prometheus success result.
@ -208,15 +208,13 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()
buf, err := bodyBuffer(r)
if err != nil {
log.Error(err)
return nil, err
}
log.LogFields(otlog.Int("bytes", len(buf)))
sp.LogKV(otlog.Int("bytes", len(buf)))
var resp PrometheusResponse
if err := json.Unmarshal(buf, &resp); err != nil {

@ -146,9 +146,10 @@ func (ast *astMapperware) checkQuerySizeLimit(ctx context.Context, bytesPerShard
}
func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
logger := spanlogger.FromContextWithFallback(
logger := util_log.WithContext(ctx, ast.logger)
spLogger := spanlogger.FromContextWithFallback(
ctx,
util_log.WithContext(ctx, ast.logger),
logger,
)
params, err := ParamsFromRequest(r)
@ -158,14 +159,14 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(params.GetExpression())
if err != nil {
level.Warn(logger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request")
level.Warn(spLogger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request")
return ast.next.Do(ctx, r)
}
conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset)))
// cannot shard with this timerange
if err != nil {
level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
level.Warn(spLogger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
return ast.next.Do(ctx, r)
}
@ -200,7 +201,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
v := ast.limits.TSDBShardingStrategy(tenants[0])
version, err := logql.ParseShardVersion(v)
if err != nil {
level.Warn(logger).Log(
level.Warn(spLogger).Log(
"msg", "failed to parse shard version",
"fallback", version.String(),
"err", err.Error(),
@ -214,7 +215,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
noop, bytesPerShard, parsed, err := mapper.Parse(params.GetExpression())
if err != nil {
level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
level.Warn(spLogger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
return nil, err
}
level.Debug(logger).Log("no-op", noop, "mapped", parsed.String())

@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/storage/types"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)
@ -141,8 +142,6 @@ func getStatsForMatchers(
func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) {
sp, ctx := opentracing.StartSpanFromContext(r.ctx, "dynamicShardResolver.GetStats")
defer sp.Finish()
log := spanlogger.FromContext(r.ctx)
defer log.Finish()
start := time.Now()
@ -159,6 +158,7 @@ func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) {
grps = append(grps, syntax.MatcherRange{})
}
log := util_log.WithContext(ctx, util_log.Logger)
results, err := getStatsForMatchers(ctx, log, r.statsHandler, r.from, r.through, grps, r.maxParallelism, r.defaultLookback)
if err != nil {
return stats.Stats{}, err

@ -195,7 +195,9 @@ func StatsCollectorMiddleware() queryrangebase.Middleware {
// Re-calculate the summary: the queueTime result is already merged so should not be updated
// Log and record metrics for the current query
responseStats.ComputeSummary(time.Since(start), 0, totalEntries)
responseStats.Log(level.Debug(logger))
if logger.Span != nil {
logger.Span.LogKV(responseStats.KVList()...)
}
}
ctxValue := ctx.Value(ctxKey)
if data, ok := ctxValue.(*queryData); ok {

@ -25,7 +25,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)
type IngesterQuerier interface {
@ -73,8 +72,6 @@ func (a *AsyncStore) GetChunks(ctx context.Context,
predicate chunk.Predicate,
storeChunksOverride *logproto.ChunkRefGroup,
) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
spanLogger := spanlogger.FromContext(ctx)
errs := make(chan error)
var storeChunks [][]chunk.Chunk
@ -98,7 +95,9 @@ func (a *AsyncStore) GetChunks(ctx context.Context,
ingesterChunks, err = a.ingesterQuerier.GetChunkIDs(ctx, from, through, predicate.Matchers...)
if err == nil {
level.Debug(spanLogger).Log("ingester-chunks-count", len(ingesterChunks))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("ingester-chunks-count", len(ingesterChunks))
}
level.Debug(util_log.Logger).Log("msg", "got chunk ids from ingester", "count", len(ingesterChunks))
}
errs <- err

@ -21,8 +21,8 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/math"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)
@ -183,8 +183,6 @@ func (s ResultsCache) handleHit(ctx context.Context, r Request, extents []Extent
)
sp, ctx := opentracing.StartSpanFromContext(ctx, "handleHit")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()
requests, responses, err := s.partition(r, extents)
if err != nil {
@ -426,14 +424,11 @@ func (s ResultsCache) get(ctx context.Context, key string) ([]Extent, bool) {
var resp CachedResponse
sp, ctx := opentracing.StartSpanFromContext(ctx, "unmarshal-extent") //nolint:ineffassign,staticcheck
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()
log.LogFields(otlog.Int("bytes", len(bufs[0])))
sp.LogFields(otlog.Int("bytes", len(bufs[0])))
if err := proto.Unmarshal(bufs[0], &resp); err != nil {
level.Error(log).Log("msg", "error unmarshalling cached value", "err", err)
log.Error(err)
level.Error(util_log.Logger).Log("msg", "error unmarshalling cached value", "err", err)
return nil, false
}

@ -4,11 +4,12 @@ import (
"context"
"sync"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
var decodeContextPool = sync.Pool{
@ -21,9 +22,7 @@ var decodeContextPool = sync.Pool{
func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "GetParallelChunks")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()
log.LogFields(otlog.Int("requested", len(chunks)))
sp.LogFields(otlog.Int("requested", len(chunks)))
if ctx.Err() != nil {
return nil, ctx.Err()
@ -67,9 +66,9 @@ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chun
}
}
log.LogFields(otlog.Int("fetched", len(result)))
sp.LogFields(otlog.Int("fetched", len(result)))
if lastErr != nil {
log.Error(lastErr)
level.Error(util_log.Logger).Log("msg", "error fetching chunks", "err", lastErr)
}
// Return any chunks we did receive: a partial result may be useful

@ -21,7 +21,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)
@ -114,8 +113,6 @@ func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
@ -123,7 +120,7 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string,
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)
sp.LogKV("metric", metricName)
return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...)
}
@ -131,8 +128,6 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string,
func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {

@ -32,7 +32,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/extract"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)
var (
@ -319,15 +318,13 @@ func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch
func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
// Fetch the series IDs from the index
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers)
if err != nil {
return nil, err
}
level.Debug(log).Log("series-ids", len(seriesIDs))
sp.LogKV("series-ids", len(seriesIDs))
// Lookup the series in the index to get label names.
labelNames, err := c.lookupLabelNamesBySeries(ctx, from, through, userID, seriesIDs)
@ -336,10 +333,10 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID
if err == series_index.ErrNotSupported {
return c.lookupLabelNamesByChunks(ctx, from, through, userID, seriesIDs)
}
level.Error(log).Log("msg", "lookupLabelNamesBySeries", "err", err)
sp.LogKV("msg", "lookupLabelNamesBySeries", "err", err)
return nil, err
}
level.Debug(log).Log("labelNames", len(labelNames))
sp.LogKV("labelNames", len(labelNames))
return labelNames, nil
}
@ -347,14 +344,12 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID
func (c *IndexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
if len(matchers) != 0 {
return c.labelValuesForMetricNameWithMatchers(ctx, userID, from, through, metricName, labelName, matchers...)
}
level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "labelName", labelName)
sp.LogKV("from", from, "through", through, "metricName", metricName, "labelName", labelName)
queries, err := c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
if err != nil {
@ -634,10 +629,8 @@ func (c *IndexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries
func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesBySeries")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
level.Debug(log).Log("seriesIDs", len(seriesIDs))
sp.LogKV("seriesIDs", len(seriesIDs))
queries := make([]series_index.Query, 0, len(seriesIDs))
for _, seriesID := range seriesIDs {
qs, err := c.schema.GetLabelNamesForSeries(from, through, userID, []byte(seriesID))
@ -646,7 +639,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from,
}
queries = append(queries, qs...)
}
level.Debug(log).Log("queries", len(queries))
sp.LogKV("queries", len(queries))
entries := entriesPool.Get().(*[]series_index.Entry)
defer entriesPool.Put(entries)
err := c.lookupEntriesByQueries(ctx, queries, entries)
@ -654,7 +647,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from,
return nil, err
}
level.Debug(log).Log("entries", len(*entries))
sp.LogKV("entries", len(*entries))
var result util.UniqueStrings
for _, entry := range *entries {
@ -671,34 +664,32 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from,
func (c *IndexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesByChunks")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs)
if err != nil {
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
sp.LogKV("msg", "lookupChunksBySeries", "err", err)
return nil, err
}
level.Debug(log).Log("chunk-ids", len(chunkIDs))
sp.LogKV("chunk-ids", len(chunkIDs))
chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs)
if err != nil {
level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err)
sp.LogKV("err", "convertChunkIDsToChunks", "err", err)
return nil, err
}
// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered = filterChunksByUniqueFingerprint(filtered)
level.Debug(log).Log("Chunks post filtering", len(chunks))
sp.LogKV("Chunks post filtering", len(chunks))
chunksPerQuery.Observe(float64(len(filtered)))
// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.fetcher.FetchChunks(ctx, filtered)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
sp.LogKV("msg", "FetchChunks", "err", err)
return nil, err
}
return labelNamesFromChunks(allChunks), nil

@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
@ -23,7 +24,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)
const (
@ -494,15 +494,16 @@ func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Dura
func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var (
logger = spanlogger.FromContext(ctx)
start = time.Now()
cacheDur time.Duration
)
defer func() {
logger.LogKV(
"cache_duration", cacheDur,
"total_duration", time.Since(start),
)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(
"cache_duration", cacheDur,
"total_duration", time.Since(start),
)
}
}()
if delimiter != "" {

@ -9,11 +9,11 @@ import (
"time"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"golang.org/x/sync/singleflight"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)
const (
@ -190,12 +190,13 @@ func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err erro
}
}()
logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger)
level.Info(logger).Log("msg", "building table names cache")
now := time.Now()
defer func() {
level.Info(logger).Log("msg", "table names cache built", "duration", time.Since(now))
}()
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("msg", "building table names cache")
now := time.Now()
defer func() {
sp.LogKV("msg", "table names cache built", "duration", time.Since(now))
}()
}
_, tableNames, err := c.ObjectClient.List(ctx, "", delimiter)
if err != nil {
@ -276,12 +277,13 @@ func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient
}
}()
logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger)
level.Info(logger).Log("msg", "building table cache")
now := time.Now()
defer func() {
level.Info(logger).Log("msg", "table cache built", "duration", time.Since(now))
}()
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("msg", "building table cache")
now := time.Now()
defer func() {
sp.LogKV("msg", "table cache built", "duration", time.Since(now))
}()
}
objects, _, err := objectClient.List(ctx, t.name+delimiter, "")
if err != nil {

Loading…
Cancel
Save