Reduce tracing spans (#4842)

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4847/head
Cyril Tovena 4 years ago committed by GitHub
parent 90cc715c3a
commit 54dc737583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/querier/queryrange/codec.go
  2. 12
      pkg/querier/queryrange/querysharding.go
  3. 19
      pkg/querier/worker/scheduler_processor.go
  4. 18
      pkg/storage/batch.go
  5. 42
      pkg/storage/chunk/cache/memcached.go
  6. 30
      pkg/storage/chunk/chunk_store.go
  7. 7
      pkg/storage/chunk/chunk_store_utils.go
  8. 19
      pkg/storage/chunk/series_store.go
  9. 4
      pkg/storage/chunk/storage/caching_index_client.go
  10. 4
      pkg/storage/chunk/util/parallel_chunk_fetch.go
  11. 7
      pkg/storage/stores/shipper/downloads/table.go
  12. 9
      pkg/storage/stores/shipper/downloads/table_manager.go
  13. 2
      pkg/storage/stores/shipper/shipper_index_client.go
  14. 4
      pkg/storage/stores/shipper/uploads/table_manager.go

@ -257,7 +257,6 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []
}
func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
header := make(http.Header)
queryTags := getQueryTags(ctx)
if queryTags != "" {
@ -371,9 +370,6 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
}
sp, _ := opentracing.StartSpanFromContext(ctx, "codec.DecodeResponse")
defer sp.Finish()
var buf []byte
var err error
if buffer, ok := r.Body.(Buffer); ok {
@ -381,11 +377,9 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
} else {
buf, err = ioutil.ReadAll(r.Body)
if err != nil {
sp.LogFields(otlog.Error(err))
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
}
sp.LogFields(otlog.Int64("bytes", r.ContentLength))
switch req := req.(type) {
case *LokiSeriesRequest:

@ -10,7 +10,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
@ -90,15 +90,13 @@ type astMapperware struct {
func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
conf, err := ast.confs.GetConf(r)
logger := util_log.WithContext(ctx, ast.logger)
// cannot shard with this timerange
if err != nil {
level.Warn(ast.logger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
return ast.next.Do(ctx, r)
}
shardedLog, ctx := spanlogger.New(ctx, "shardedEngine")
defer shardedLog.Finish()
mapper, err := logql.NewShardMapper(int(conf.RowShards), ast.metrics)
if err != nil {
return nil, err
@ -106,10 +104,10 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
noop, parsed, err := mapper.Parse(r.GetQuery())
if err != nil {
level.Warn(shardedLog).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
return nil, err
}
level.Debug(shardedLog).Log("no-op", noop, "mapped", parsed.String())
level.Debug(logger).Log("no-op", noop, "mapped", parsed.String())
if noop {
// the ast can't be mapped to a sharded equivalent

@ -27,8 +27,6 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
@ -134,18 +132,10 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
// paired with a Send.
go func() {
// We need to inject user into context for sending response back.
ctx := user.InjectOrgID(ctx, request.UserID)
tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
parentSpanContext, _ := lokigrpc.GetParentSpanForRequest(tracer, request.HttpRequest)
if parentSpanContext != nil {
queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext))
defer queueSpan.Finish()
ctx = spanCtx
}
logger := util_log.WithContext(ctx, sp.log)
var (
ctx = user.InjectOrgID(ctx, request.UserID)
logger = util_log.WithContext(ctx, sp.log)
)
sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)
@ -206,7 +196,6 @@ func (sp *schedulerProcessor) createFrontendClient(addr string) (client.PoolClie
middleware.ClientUserHeaderInterceptor,
dskit_middleware.PrometheusGRPCUnaryInstrumentation(sp.frontendClientRequestDuration),
}, nil)
if err != nil {
return nil, err
}

@ -7,7 +7,6 @@ import (
"github.com/cortexproject/cortex/pkg/querier/astmapper"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -657,11 +656,12 @@ outer:
}
func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error {
log, ctx := spanlogger.New(ctx, "LokiStore.fetchLazyChunks")
defer log.Finish()
start := time.Now()
stats := stats.FromContext(ctx)
var totalChunks int64
var (
totalChunks int64
start = time.Now()
stats = stats.FromContext(ctx)
logger = util_log.WithContext(ctx, util_log.Logger)
)
defer func() {
stats.AddChunksDownloadTime(time.Since(start))
stats.AddChunksDownloaded(totalChunks)
@ -677,7 +677,7 @@ func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error {
if len(chksByFetcher) == 0 {
return nil
}
level.Debug(log).Log("msg", "loading lazy chunks", "chunks", totalChunks)
level.Debug(logger).Log("msg", "loading lazy chunks", "chunks", totalChunks)
errChan := make(chan error)
for fetcher, chunks := range chksByFetcher {
@ -696,9 +696,9 @@ func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error {
}
chks, err := fetcher.FetchChunks(ctx, chks, keys)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error fetching chunks", "err", err)
level.Error(logger).Log("msg", "error fetching chunks", "err", err)
if isInvalidChunkError(err) {
level.Error(util_log.Logger).Log("msg", "checksum of chunks does not match", "err", chunk.ErrInvalidChecksum)
level.Error(logger).Log("msg", "checksum of chunks does not match", "err", chunk.ErrInvalidChecksum)
errChan <- nil
return
}

@ -11,13 +11,12 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
instr "github.com/weaveworks/common/instrument"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)
// MemcachedConfig is config to make a Memcached
@ -126,34 +125,27 @@ func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, b
found, bufs, missed = c.fetch(ctx, keys)
return
}
_ = instr.CollectedRequest(ctx, "Memcache.GetBatched", c.requestDuration, memcacheStatusCode, func(ctx context.Context) error {
found, bufs, missed = c.fetchKeysBatched(ctx, keys)
return nil
})
start := time.Now()
found, bufs, missed = c.fetchKeysBatched(ctx, keys)
c.requestDuration.After(ctx, "Memcache.GetBatched", "200", start)
return
}
func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
var items map[string]*memcache.Item
const method = "Memcache.GetMulti"
err := instr.CollectedRequest(ctx, method, c.requestDuration, memcacheStatusCode, func(innerCtx context.Context) error {
log, _ := spanlogger.New(innerCtx, method)
defer log.Finish()
log.LogFields(otlog.Int("keys requested", len(keys)))
var err error
items, err = c.memcache.GetMulti(keys)
log.LogFields(otlog.Int("keys found", len(items)))
// Memcached returns partial results even on error.
if err != nil {
log.Error(err)
level.Error(log).Log("msg", "Failed to get keys from memcached", "err", err)
}
return err
})
var (
err error
start = time.Now()
items map[string]*memcache.Item
)
items, err = c.memcache.GetMulti(keys)
c.requestDuration.After(ctx, "Memcache.GetMulti", memcacheStatusCode(err), start)
if err != nil {
level.Error(util_log.WithContext(ctx, c.logger)).Log(
"msg", "Failed to get keys from memcached",
"keys requested", len(keys),
"err", err,
)
return found, bufs, keys
}

@ -287,8 +287,6 @@ func (c *store) LabelNamesForMetricName(ctx context.Context, userID string, from
func (c *baseStore) validateQueryTimeRange(ctx context.Context, userID string, from *model.Time, through *model.Time) (bool, error) {
//nolint:ineffassign,staticcheck //Leaving ctx even though we don't currently use it, we want to make it available for when we might need it and hopefully will ensure us using the correct context at that time
log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange")
defer log.Span.Finish()
if *through < *from {
return false, QueryError(fmt.Sprintf("invalid query, through < from (%s < %s)", through, from))
@ -303,13 +301,13 @@ func (c *baseStore) validateQueryTimeRange(ctx context.Context, userID string, f
if from.After(now) {
// time-span start is in future ... regard as legal
level.Info(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now)
level.Info(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now)
return true, nil
}
if through.After(now.Add(5 * time.Minute)) {
// time-span end is in future ... regard as legal
level.Info(log).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now)
level.Info(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now)
*through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes
}
@ -317,9 +315,6 @@ func (c *baseStore) validateQueryTimeRange(ctx context.Context, userID string, f
}
func (c *baseStore) validateQuery(ctx context.Context, userID string, from *model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQuery")
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, from, through)
if err != nil {
return "", nil, false, err
@ -440,10 +435,6 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
}
func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]IndexQuery) []IndexQuery) ([]string, error) {
formattedMatcher := formatMatcher(matcher)
log, ctx := spanlogger.New(ctx, "Store.lookupIdsByMetricNameMatcher", "metricName", metricName, "matcher", formattedMatcher)
defer log.Span.Finish()
var err error
var queries []IndexQuery
var labelName string
@ -459,11 +450,10 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro
if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", formattedMatcher, "queries", len(queries))
unfilteredQueries := len(queries)
if filter != nil {
queries = filter(queries)
level.Debug(log).Log("matcher", formattedMatcher, "filteredQueries", len(queries))
}
entries, err := c.lookupEntriesByQueries(ctx, queries)
@ -474,13 +464,20 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro
} else if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", formattedMatcher, "entries", len(entries))
ids, err := c.parseIndexEntries(ctx, entries, matcher)
if err != nil {
return nil, err
}
level.Debug(log).Log("matcher", formattedMatcher, "ids", len(ids))
level.Debug(util_log.WithContext(ctx, util_log.Logger)).
Log(
"msg", "Store.lookupIdsByMetricNameMatcher",
"matcher", formatMatcher(matcher),
"queries", unfilteredQueries,
"filteredQueries", len(queries),
"entries", len(entries),
"ids", len(ids),
)
return ids, nil
}
@ -496,9 +493,6 @@ func formatMatcher(matcher *labels.Matcher) string {
}
func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()
// Nothing to do if there are no queries.
if len(queries) == 0 {
return nil, nil

@ -201,9 +201,8 @@ func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys
requests = make([]decodeRequest, 0, len(keys))
responses = make(chan decodeResponse)
missing []Chunk
logger = util_log.WithContext(ctx, util_log.Logger)
)
log, _ := spanlogger.New(ctx, "Fetcher.processCacheResponse")
defer log.Span.Finish()
i, j := 0, 0
for i < len(chunks) && j < len(keys) {
@ -213,7 +212,7 @@ func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys
missing = append(missing, chunks[i])
i++
} else if chunkKey > keys[j] {
level.Warn(util_log.Logger).Log("msg", "got chunk from cache we didn't ask for")
level.Warn(logger).Log("msg", "got chunk from cache we didn't ask for")
j++
} else {
requests = append(requests, decodeRequest{
@ -228,7 +227,7 @@ func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys
for ; i < len(chunks); i++ {
missing = append(missing, chunks[i])
}
level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))
level.Debug(logger).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))
go func() {
for _, request := range requests {

@ -15,6 +15,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/storage/chunk/cache"
@ -261,9 +262,6 @@ func (c *seriesStore) lookupLabelNamesByChunks(ctx context.Context, from, throug
}
func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()
// Check if one of the labels is a shard annotation, pass that information to lookupSeriesByMetricNameMatcher,
// and remove the label.
shard, shardLabelIndex, err := astmapper.ShardFromMatchers(matchers)
@ -341,7 +339,8 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
preIntersectionPerQuery.Observe(float64(preIntersectionCount))
postIntersectionPerQuery.Observe(float64(len(ids)))
level.Debug(log).Log("msg", "post intersection", "ids", len(ids))
level.Debug(util_log.WithContext(ctx, util_log.Logger)).
Log("msg", "post intersection", "matchers", len(matchers), "ids", len(ids))
return ids, nil
}
@ -352,11 +351,6 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
}
func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupChunksBySeries")
defer log.Span.Finish()
level.Debug(log).Log("seriesIDs", len(seriesIDs))
queries := make([]IndexQuery, 0, len(seriesIDs))
for _, seriesID := range seriesIDs {
qs, err := c.schema.GetChunksForSeries(from, through, userID, []byte(seriesID))
@ -365,13 +359,16 @@ func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through mo
}
queries = append(queries, qs...)
}
level.Debug(log).Log("queries", len(queries))
entries, err := c.lookupEntriesByQueries(ctx, queries)
if err != nil {
return nil, err
}
level.Debug(log).Log("entries", len(entries))
level.Debug(util_log.WithContext(ctx, util_log.Logger)).Log(
"msg", "SeriesStore.lookupChunksBySeries",
"seriesIDs", len(seriesIDs),
"queries", len(queries),
"entries", len(entries))
result, err := c.parseIndexEntries(ctx, entries, nil)
return result, err

@ -292,8 +292,8 @@ func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batc
}
func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (batches []ReadBatch, missed []string) {
log, ctx := spanlogger.New(ctx, "cachingIndexClient.cacheFetch")
defer log.Finish()
log := spanlogger.FromContext(ctx)
level.Debug(log).Log("requested", len(keys))
cacheGets.Add(float64(len(keys)))

@ -23,7 +23,7 @@ var decodeContextPool = sync.Pool{
func GetParallelChunks(ctx context.Context, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) {
log, ctx := spanlogger.New(ctx, "GetParallelChunks")
defer log.Finish()
log.LogFields(otlog.Int("chunks requested", len(chunks)))
log.LogFields(otlog.Int("requested", len(chunks)))
queuedChunks := make(chan chunk.Chunk)
@ -63,7 +63,7 @@ func GetParallelChunks(ctx context.Context, chunks []chunk.Chunk, f func(context
}
}
log.LogFields(otlog.Int("chunks fetched", len(result)))
log.LogFields(otlog.Int("fetched", len(result)))
if lastErr != nil {
log.Error(lastErr)
}

@ -282,10 +282,9 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
t.lastUsedAt = time.Now()
log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Table.MultiQueries")
defer log.Span.Finish()
logger := util_log.WithContext(ctx, util_log.Logger)
level.Debug(log).Log("table-name", t.name, "query-count", len(queries))
level.Debug(logger).Log("table-name", t.name, "query-count", len(queries))
for name, db := range t.dbs {
err := db.View(func(tx *bbolt.Tx) error {
@ -306,7 +305,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
return err
}
level.Debug(log).Log("queried-db", name)
level.Debug(logger).Log("queried-db", name)
}
return nil

@ -12,7 +12,6 @@ import (
"time"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@ -142,10 +141,8 @@ func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQue
}
func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Query")
defer log.Span.Finish()
level.Debug(log).Log("table-name", tableName)
logger := util_log.WithContext(ctx, util_log.Logger)
level.Debug(logger).Log("table-name", tableName)
table := tm.getOrCreateTable(ctx, tableName)
@ -156,7 +153,7 @@ func (tm *TableManager) query(ctx context.Context, tableName string, queries []c
tm.tablesMtx.Lock()
defer tm.tablesMtx.Unlock()
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", tableName), "err", table.Err())
level.Error(logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", tableName), "err", table.Err())
delete(tm.tables, tableName)
return table.Err()

@ -221,7 +221,7 @@ func (s *Shipper) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error
}
func (s *Shipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "Shipper.Query", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
spanLogger := spanlogger.FromContext(ctx)
if s.uploadsManager != nil {

@ -12,7 +12,6 @@ import (
"time"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@ -108,9 +107,6 @@ func (tm *TableManager) query(ctx context.Context, tableName string, queries []c
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()
log, ctx := spanlogger.New(ctx, "Shipper.Uploads.Query")
defer log.Span.Finish()
table, ok := tm.tables[tableName]
if !ok {
return nil

Loading…
Cancel
Save