diff --git a/pkg/chunkenc/decompression/context.go b/pkg/chunkenc/decompression/context.go index e5221c952f..3137bc1add 100644 --- a/pkg/chunkenc/decompression/context.go +++ b/pkg/chunkenc/decompression/context.go @@ -11,11 +11,11 @@ const ctxKey ctxKeyType = "decompression" // Stats is decompression statistic type Stats struct { - TimeDecompress time.Duration // Time spent decompressing chunks - TimeFiltering time.Duration // Time spent filtering lines BytesDecompressed int64 // Total bytes decompressed data size BytesCompressed int64 // Total bytes compressed read FetchedChunks int64 // Total number of chunks fetched. + TotalDuplicates int64 // Total number of line duplicates from replication. + TimeFetching time.Duration // Time spent fetching chunks. } // NewContext creates a new decompression context diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 6298d26ddb..5f997bdaf4 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -560,8 +560,6 @@ func (li *listIterator) Labels() string { return "" } type bufferedIterator struct { origBytes []byte rootCtx context.Context - timeDecompress time.Duration - timeFiltering time.Duration bytesDecompressed int64 bufReader *bufio.Reader @@ -600,21 +598,16 @@ func (si *bufferedIterator) Next() bool { } for { - start := time.Now() ts, line, ok := si.moveNext() - si.timeDecompress += time.Since(start) if !ok { si.Close() return false } // we decode always the line length and ts as varint si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64 - start = time.Now() if si.filter != nil && !si.filter(line) { - si.timeFiltering += time.Since(start) continue } - si.timeFiltering += time.Since(start) si.cur.Line = string(line) si.cur.Timestamp = time.Unix(0, ts) return true @@ -690,8 +683,6 @@ func (si *bufferedIterator) Close() error { func (si *bufferedIterator) close() { decompression.Mutate(si.rootCtx, func(current *decompression.Stats) { - current.TimeDecompress += si.timeDecompress - current.TimeFiltering += si.timeFiltering current.BytesDecompressed += si.bytesDecompressed current.BytesCompressed += int64(len(si.origBytes)) }) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 0e844ab660..7b3715b992 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -209,7 +209,7 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie return err } - iter := iter.NewHeapIterator(iters, req.Direction) + iter := iter.NewHeapIterator(queryServer.Context(), iters, req.Direction) defer helpers.LogError("closing iterator", iter.Close) return sendBatches(iter, queryServer, req.Limit) diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 4c15802a5e..3b030b3343 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -2,10 +2,12 @@ package iter import ( "container/heap" + "context" "fmt" "io" "time" + "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/logproto" ) @@ -130,17 +132,19 @@ type heapIterator struct { } is []EntryIterator prefetched bool + ctx context.Context - tuples []tuple - currEntry logproto.Entry - currLabels string - errs []error + tuples []tuple + currEntry logproto.Entry + currLabels string + errs []error + linesDuplicate int64 } // NewHeapIterator returns a new iterator which uses a heap to merge together // entries for multiple interators. -func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapIterator { - result := &heapIterator{is: is} +func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { + result := &heapIterator{is: is, ctx: ctx} switch direction { case logproto.BACKWARD: result.heap = &iteratorMaxHeap{} @@ -233,7 +237,12 @@ func (i *heapIterator) Next() bool { // Requeue the iterators, advancing them if they were consumed. for j := range i.tuples { - i.requeue(i.tuples[j].EntryIterator, i.tuples[j].Line != i.currEntry.Line) + if i.tuples[j].Line != i.currEntry.Line { + i.requeue(i.tuples[j].EntryIterator, true) + continue + } + i.linesDuplicate++ + i.requeue(i.tuples[j].EntryIterator, false) } i.tuples = i.tuples[:0] return true @@ -302,6 +311,9 @@ func (i *heapIterator) Error() error { } func (i *heapIterator) Close() error { + decompression.Mutate(i.ctx, func(m *decompression.Stats) { + m.TotalDuplicates += i.linesDuplicate + }) for i.heap.Len() > 0 { if err := i.heap.Pop().(EntryIterator).Close(); err != nil { return err @@ -325,21 +337,21 @@ func (i *heapIterator) Len() int { } // NewStreamsIterator returns an iterator over logproto.Stream -func NewStreamsIterator(streams []*logproto.Stream, direction logproto.Direction) EntryIterator { +func NewStreamsIterator(ctx context.Context, streams []*logproto.Stream, direction logproto.Direction) EntryIterator { is := make([]EntryIterator, 0, len(streams)) for i := range streams { is = append(is, NewStreamIterator(streams[i])) } - return NewHeapIterator(is, direction) + return NewHeapIterator(ctx, is, direction) } // NewQueryResponseIterator returns an iterator over a QueryResponse. -func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator { +func NewQueryResponseIterator(ctx context.Context, resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator { is := make([]EntryIterator, 0, len(resp.Streams)) for i := range resp.Streams { is = append(is, NewStreamIterator(resp.Streams[i])) } - return NewHeapIterator(is, direction) + return NewHeapIterator(ctx, is, direction) } type queryClientIterator struct { @@ -367,7 +379,7 @@ func (i *queryClientIterator) Next() bool { return false } - i.curr = NewQueryResponseIterator(batch, i.direction) + i.curr = NewQueryResponseIterator(i.client.Context(), batch, i.direction) } return true diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index 54efb3a35f..698464da5e 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -1,6 +1,7 @@ package iter import ( + "context" "fmt" "sort" "testing" @@ -40,7 +41,7 @@ func TestIterator(t *testing.T) { // Test dedupe of overlapping iterators with the heap iterator. { - iterator: NewHeapIterator([]EntryIterator{ + iterator: NewHeapIterator(context.Background(), []EntryIterator{ mkStreamIterator(offset(0, identity), defaultLabels), mkStreamIterator(offset(testSize/2, identity), defaultLabels), mkStreamIterator(offset(testSize, identity), defaultLabels), @@ -52,7 +53,7 @@ func TestIterator(t *testing.T) { // Test dedupe of overlapping iterators with the heap iterator (backward). { - iterator: NewHeapIterator([]EntryIterator{ + iterator: NewHeapIterator(context.Background(), []EntryIterator{ mkStreamIterator(inverse(offset(0, identity)), defaultLabels), mkStreamIterator(inverse(offset(-testSize/2, identity)), defaultLabels), mkStreamIterator(inverse(offset(-testSize, identity)), defaultLabels), @@ -64,7 +65,7 @@ func TestIterator(t *testing.T) { // Test dedupe of entries with the same timestamp but different entries. { - iterator: NewHeapIterator([]EntryIterator{ + iterator: NewHeapIterator(context.Background(), []EntryIterator{ mkStreamIterator(offset(0, constant(0)), defaultLabels), mkStreamIterator(offset(0, constant(0)), defaultLabels), mkStreamIterator(offset(testSize, constant(0)), defaultLabels), @@ -105,7 +106,7 @@ func TestIteratorMultipleLabels(t *testing.T) { }{ // Test merging with differing labels but same timestamps and values. { - iterator: NewHeapIterator([]EntryIterator{ + iterator: NewHeapIterator(context.Background(), []EntryIterator{ mkStreamIterator(identity, "{foobar: \"baz1\"}"), mkStreamIterator(identity, "{foobar: \"baz2\"}"), }, logproto.FORWARD), @@ -123,7 +124,7 @@ func TestIteratorMultipleLabels(t *testing.T) { // Test merging with differing labels but all the same timestamps and different values. { - iterator: NewHeapIterator([]EntryIterator{ + iterator: NewHeapIterator(context.Background(), []EntryIterator{ mkStreamIterator(constant(0), "{foobar: \"baz1\"}"), mkStreamIterator(constant(0), "{foobar: \"baz2\"}"), }, logproto.FORWARD), @@ -177,7 +178,7 @@ func TestHeapIteratorPrefetch(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - i := NewHeapIterator([]EntryIterator{ + i := NewHeapIterator(context.Background(), []EntryIterator{ mkStreamIterator(identity, "{foobar: \"baz1\"}"), mkStreamIterator(identity, "{foobar: \"baz2\"}"), }, logproto.FORWARD) @@ -279,7 +280,7 @@ func TestReverseEntryIterator(t *testing.T) { itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels) itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}") - heapIterator := NewHeapIterator([]EntryIterator{itr1, itr2}, logproto.BACKWARD) + heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) reversedIter, err := NewReversedIter(heapIterator, testSize, false) require.NoError(t, err) @@ -301,7 +302,7 @@ func TestReverseEntryIteratorUnlimited(t *testing.T) { itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels) itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}") - heapIterator := NewHeapIterator([]EntryIterator{itr1, itr2}, logproto.BACKWARD) + heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) reversedIter, err := NewReversedIter(heapIterator, 0, false) require.NoError(t, err) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 693ef94583..aa2945fb8d 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -169,8 +169,8 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) { defer func() { stats := decompression.GetStats(ctx) level.Debug(log).Log( - "Time Decompressing (ms)", stats.TimeDecompress.Nanoseconds()/int64(time.Millisecond), - "Time Filtering (ms)", stats.TimeFiltering.Nanoseconds()/int64(time.Millisecond), + "Time Fetching chunk (ms)", stats.TimeFetching.Nanoseconds()/int64(time.Millisecond), + "Total Duplicates", stats.TotalDuplicates, "Fetched chunks", stats.FetchedChunks, "Total bytes compressed (MB)", stats.BytesCompressed/1024/1024, "Total bytes uncompressed (MB)", stats.BytesDecompressed/1024/1024, diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index f0e2e7e6c2..547c93d3b3 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -771,7 +771,7 @@ func getLocalQuerier(size int64) Querier { iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)), } return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) { - return iter.NewHeapIterator(iters, p.Direction), nil + return iter.NewHeapIterator(ctx, iters, p.Direction), nil }) } @@ -799,7 +799,7 @@ func (q *querierRecorder) Select(ctx context.Context, p SelectParams) (iter.Entr for _, s := range streams { iters = append(iters, iter.NewStreamIterator(s)) } - return iter.NewHeapIterator(iters, p.Direction), nil + return iter.NewHeapIterator(ctx, iters, p.Direction), nil } func paramsID(p SelectParams) string { diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index 879b6c7e42..d7a463375f 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -1,6 +1,7 @@ package logql import ( + "context" "fmt" "testing" "time" @@ -29,7 +30,7 @@ var labelFoo, _ = promql.ParseMetric("{app=\"foo\"}") var labelBar, _ = promql.ParseMetric("{app=\"bar\"}") func newEntryIterator() iter.EntryIterator { - return iter.NewHeapIterator([]iter.EntryIterator{ + return iter.NewHeapIterator(context.Background(), []iter.EntryIterator{ iter.NewStreamIterator(&logproto.Stream{ Labels: labelFoo.String(), Entries: entries, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 410e2b0ccc..18c7fa6a6e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -158,7 +158,7 @@ func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.E return nil, err } iterators := append(ingesterIterators, chunkStoreIterators) - return iter.NewHeapIterator(iterators, params.Direction), nil + return iter.NewHeapIterator(ctx, iterators, params.Direction), nil } func (q *Querier) queryIngesters(ctx context.Context, params logql.SelectParams) ([]iter.EntryIterator, error) { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 8516d627ea..034af92629 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -60,6 +60,10 @@ func (c *querierClientMock) Series(ctx context.Context, in *logproto.SeriesReque return res.(*logproto.SeriesResponse), args.Error(1) } +func (c *querierClientMock) Context() context.Context { + return context.Background() +} + // newIngesterClientMockFactory creates a factory function always returning // the input querierClientMock func newIngesterClientMockFactory(c *querierClientMock) cortex_client.Factory { @@ -118,6 +122,10 @@ func (c *queryClientMock) RecvMsg(m interface{}) error { return nil } +func (c *queryClientMock) Context() context.Context { + return context.Background() +} + // tailClientMock is mockable version of Querier_TailClient type tailClientMock struct { util.ExtendedMock @@ -290,7 +298,7 @@ func mockStreamIterFromLabelSets(from, quantity int, sets []string) iter.EntryIt streams = append(streams, mockStreamWithLabels(from, quantity, s)) } - return iter.NewStreamsIterator(streams, logproto.FORWARD) + return iter.NewStreamsIterator(context.Background(), streams, logproto.FORWARD) } // mockStream return a stream with quantity entries, where entries timestamp and diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 48abc3fc48..e3eba5a155 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -1,6 +1,7 @@ package querier import ( + "context" "sync" "time" @@ -268,7 +269,7 @@ func newTailer( waitEntryThrottle time.Duration, ) *Tailer { t := Tailer{ - openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{historicEntries}, logproto.FORWARD), + openStreamIterator: iter.NewHeapIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD), querierTailClients: querierTailClients, delayFor: delayFor, responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses), diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 80ad857336..9cef67a8da 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -302,7 +302,7 @@ func newChunksIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, matche return nil, err } - return iter.NewHeapIterator(iters, direction), nil + return iter.NewHeapIterator(ctx, iters, direction), nil } func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk, filter logql.Filter, direction logproto.Direction, from, through time.Time) ([]iter.EntryIterator, error) { @@ -341,7 +341,7 @@ func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter result = append(result, iter.NewNonOverlappingIterator(iterators, labels)) } - return iter.NewHeapIterator(result, direction), nil + return iter.NewHeapIterator(ctx, result, direction), nil } func filterSeriesByMatchers(chks map[model.Fingerprint][][]*chunkenc.LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]*chunkenc.LazyChunk { @@ -360,6 +360,10 @@ outer: func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { log, ctx := spanlogger.New(ctx, "LokiStore.fetchLazyChunks") defer log.Finish() + start := time.Now() + defer decompression.Mutate(ctx, func(m *decompression.Stats) { + m.TimeFetching += time.Since(start) + }) var totalChunks int chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{}