diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index 0c28985e48..a989bac192 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -96,13 +96,13 @@ func (t *indexSet) Init() (err error) { defer func() { if err != nil { - level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to initialize table %s, cleaning it up", t.tableName), "err", err) + level.Error(t.logger).Log("msg", "failed to initialize table, cleaning it up", "err", err) t.err = err // cleaning up files due to error to avoid returning invalid results. for fileName := range t.dbs { if err := t.cleanupDB(fileName); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err) + level.Error(t.logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err) } } } @@ -194,7 +194,7 @@ func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, t.lastUsedAt = time.Now() logger := util_log.WithContext(ctx, t.logger) - level.Debug(logger).Log("table-name", t.tableName, "query-count", len(queries)) + level.Debug(logger).Log("query-count", len(queries), "dbs-count", len(t.dbs)) for name, db := range t.dbs { err := db.View(func(tx *bbolt.Tx) error { @@ -279,7 +279,7 @@ func (t *indexSet) Sync(ctx context.Context) (err error) { // sync downloads updated and new files from the storage relevant for the table and removes the deleted ones func (t *indexSet) sync(ctx context.Context, lock bool) (err error) { - level.Debug(util_log.Logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.tableName)) + level.Debug(t.logger).Log("msg", "syncing index files") defer func() { status := statusSuccess @@ -294,7 +294,7 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) { return err } - level.Debug(util_log.Logger).Log("msg", fmt.Sprintf("updates for table %s. toDownload: %s, toDelete: %s", t.tableName, toDownload, toDelete)) + level.Debug(t.logger).Log("msg", "index sync updates", "toDownload", fmt.Sprint(toDownload), "toDelete", fmt.Sprint(toDelete)) downloadedFiles, err := t.doConcurrentDownload(ctx, toDownload) if err != nil { diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 02f07a6cee..16d9b3a84c 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/tenant" util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/util/spanlogger" ) // timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time. @@ -158,7 +159,7 @@ func (t *table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca // query both user and common index for _, uid := range []string{userID, ""} { - indexSet, err := t.getOrCreateIndexSet(uid, true) + indexSet, err := t.getOrCreateIndexSet(ctx, uid, true) if err != nil { return err } @@ -258,7 +259,7 @@ func (t *table) Sync(ctx context.Context) error { // Caller can use IndexSet.AwaitReady() to wait until the IndexSet gets ready, if required. // forQuerying must be set to true only getting the index for querying since // it captures the amount of time it takes to download the index at query time. -func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, error) { +func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) { t.indexSetsMtx.RLock() indexSet, ok := t.indexSets[id] t.indexSetsMtx.RUnlock() @@ -293,9 +294,11 @@ func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, erro if forQuerying { start := time.Now() defer func() { - duration := time.Since(start).Seconds() - t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration) - level.Info(loggerWithUserID(t.logger, id)).Log("msg", "downloaded index set at query time", "duration", duration) + duration := time.Since(start) + t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds()) + + logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id)) + level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration) }() } @@ -311,7 +314,7 @@ func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, erro // EnsureQueryReadiness ensures that we have downloaded the common index as well as user index for the provided userIDs. // When ensuring query readiness for a table, we will always download common index set because it can include index for one of the provided user ids. func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) error { - commonIndexSet, err := t.getOrCreateIndexSet("", false) + commonIndexSet, err := t.getOrCreateIndexSet(ctx, "", false) if err != nil { return err } @@ -339,7 +342,7 @@ func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) erro // downloadUserIndexes downloads user specific index files concurrently. func (t *table) downloadUserIndexes(ctx context.Context, userIDs []string) error { return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error { - indexSet, err := t.getOrCreateIndexSet(userIDs[idx], false) + indexSet, err := t.getOrCreateIndexSet(ctx, userIDs[idx], false) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/storage/cached_client.go b/pkg/storage/stores/shipper/storage/cached_client.go index 96ecc7f80d..e8391742fa 100644 --- a/pkg/storage/stores/shipper/storage/cached_client.go +++ b/pkg/storage/stores/shipper/storage/cached_client.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/util/spanlogger" ) const ( @@ -124,6 +125,13 @@ func (c *cachedObjectClient) buildCache(ctx context.Context) error { return nil } + logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) + level.Info(logger).Log("msg", "building index list cache") + now := time.Now() + defer func() { + level.Info(logger).Log("msg", "index list cache built", "duration", time.Since(now)) + }() + objects, _, err := c.ObjectClient.List(ctx, "", "") if err != nil { return err diff --git a/pkg/storage/stores/shipper/util/queries.go b/pkg/storage/stores/shipper/util/queries.go index 3108e7e775..b7eaa975ed 100644 --- a/pkg/storage/stores/shipper/util/queries.go +++ b/pkg/storage/stores/shipper/util/queries.go @@ -4,8 +4,11 @@ import ( "context" "sync" + "github.com/go-kit/log/level" + "github.com/grafana/loki/pkg/storage/chunk" util_math "github.com/grafana/loki/pkg/util/math" + "github.com/grafana/loki/pkg/util/spanlogger" ) const maxQueriesPerGoroutine = 100 @@ -29,9 +32,18 @@ func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery { } func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { + if len(queries) == 0 { + return nil + } errs := make(chan error) id := NewIndexDeduper(callback) + defer func() { + logger := spanlogger.FromContext(ctx) + level.Debug(logger).Log("msg", "done processing index queries", "table-name", queries[0].TableName, + "query-count", len(queries), "num-entries-sent", id.numEntriesSent) + }() + if len(queries) <= maxQueriesPerGoroutine { return tableQuerier.MultiQueries(ctx, queries, id.Callback) } @@ -59,6 +71,7 @@ func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries [ type IndexDeduper struct { callback chunk.QueryPagesCallback seenRangeValues map[string]map[string]struct{} + numEntriesSent int mtx sync.RWMutex } @@ -105,6 +118,7 @@ func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool { // add the rangeValue i.seenRangeValues[hashValue][rangeValueStr] = struct{}{} + i.numEntriesSent++ return false }