From 3af7b7900ef8637d352591b6d02ee67165b60d5f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 1 Feb 2022 08:46:20 +0100 Subject: [PATCH] Refactor HeapIterator into Merge and Sort Iterator. (#5281) * Refactor HeapIterator into merge and sort Iterator. Signed-off-by: Cyril Tovena * lint Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk.go | 10 +- pkg/chunkenc/unordered.go | 4 +- pkg/ingester/ingester.go | 23 ++-- pkg/ingester/instance.go | 8 +- pkg/ingester/instance_test.go | 9 +- pkg/ingester/stream.go | 4 +- pkg/iter/entry_iterator.go | 185 ++++++++++++++++++++++++------- pkg/iter/entry_iterator_test.go | 166 ++++++++++++++++++++++----- pkg/iter/sample_iterator.go | 162 ++++++++++++++++++++++----- pkg/iter/sample_iterator_test.go | 114 ++++++++++++++++--- pkg/logcli/client/file.go | 11 +- pkg/logql/engine_test.go | 32 ++---- pkg/logql/range_vector_test.go | 2 +- pkg/logql/sharding.go | 4 +- pkg/logql/test_utils.go | 4 +- pkg/querier/querier.go | 8 +- pkg/querier/tail.go | 2 +- pkg/storage/batch.go | 12 +- 18 files changed, 573 insertions(+), 187 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index aa4239175f..9740536571 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -818,7 +818,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if ordered { it = iter.NewNonOverlappingIterator(blockItrs, "") } else { - it = iter.NewHeapIterator(ctx, blockItrs, direction) + it = iter.NewSortEntryIterator(blockItrs, direction) } return iter.NewTimeRangedIterator( @@ -851,7 +851,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if ordered { return iter.NewNonOverlappingIterator(blockItrs, ""), nil } - return iter.NewHeapIterator(ctx, blockItrs, direction), nil + return iter.NewSortEntryIterator(blockItrs, direction), nil } // Iterator implements Chunk. @@ -886,7 +886,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, if ordered { it = iter.NewNonOverlappingSampleIterator(its, "") } else { - it = iter.NewHeapSampleIterator(ctx, its) + it = iter.NewSortSampleIterator(its) } return iter.NewTimeRangedSampleIterator( @@ -1041,7 +1041,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, for _, stream := range streams { streamsResult = append(streamsResult, *stream) } - return iter.NewStreamsIterator(ctx, streamsResult, direction) + return iter.NewStreamsIterator(streamsResult, direction) } func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator { @@ -1082,7 +1082,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra for _, s := range series { seriesRes = append(seriesRes, *s) } - return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error { + return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(seriesRes), func() error { for _, s := range series { SamplesPool.Put(s.Samples) } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 4c5a416ab4..f7bf4ae6a0 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -257,7 +257,7 @@ func (hb *unorderedHeadBlock) Iterator( for _, stream := range streams { streamsResult = append(streamsResult, *stream) } - return iter.NewStreamsIterator(ctx, streamsResult, direction) + return iter.NewStreamsIterator(streamsResult, direction) } // nolint:unused @@ -308,7 +308,7 @@ func (hb *unorderedHeadBlock) SampleIterator( for _, s := range series { seriesRes = append(seriesRes, *s) } - return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error { + return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(seriesRes), func() error { for _, s := range series { SamplesPool.Put(s.Samples) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 87226d3575..def6e16f74 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -561,7 +561,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie } instance := i.GetOrCreateInstance(instanceID) - itrs, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) + it, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) if err != nil { return err } @@ -577,17 +577,15 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie }} storeItr, err := i.store.SelectLogs(ctx, storeReq) if err != nil { + errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close) return err } - - itrs = append(itrs, storeItr) + it = iter.NewMergeEntryIterator(ctx, []iter.EntryIterator{it, storeItr}, req.Direction) } - heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction) - - defer errUtil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close) + defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close) - return sendBatches(ctx, heapItr, queryServer, req.Limit) + return sendBatches(ctx, it, queryServer, req.Limit) } // QuerySample the ingesters for series from logs matching a set of matchers. @@ -601,7 +599,7 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log } instance := i.GetOrCreateInstance(instanceID) - itrs, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) + it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) if err != nil { return err } @@ -615,17 +613,16 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log }} storeItr, err := i.store.SelectSamples(ctx, storeReq) if err != nil { + errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close) return err } - itrs = append(itrs, storeItr) + it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr}) } - heapItr := iter.NewHeapSampleIterator(ctx, itrs) - - defer errUtil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close) + defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close) - return sendSampleBatches(ctx, heapItr, queryServer) + return sendSampleBatches(ctx, it, queryServer) } // boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper. diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index b21f896690..6bf66c3472 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -306,7 +306,7 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels return s.labels } -func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter.EntryIterator, error) { +func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { expr, err := req.LogSelector() if err != nil { return nil, err @@ -341,10 +341,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter return nil, err } - return iters, nil + return iter.NewSortEntryIterator(iters, req.Direction), nil } -func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) ([]iter.SampleIterator, error) { +func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { expr, err := req.Expr() if err != nil { return nil, err @@ -386,7 +386,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams return nil, err } - return iters, nil + return iter.NewSortSampleIterator(iters), nil } // Label returns the label names or values depending on the given request diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 5a21111023..5918e443cc 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" loki_runtime "github.com/grafana/loki/pkg/runtime" @@ -497,7 +496,7 @@ func Test_Iterator(t *testing.T) { } // prepare iterators. - itrs, err := instance.Query(ctx, + it, err := instance.Query(ctx, logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ Selector: `{job="3"} | logfmt`, @@ -509,12 +508,11 @@ func Test_Iterator(t *testing.T) { }, ) require.NoError(t, err) - heapItr := iter.NewHeapIterator(ctx, itrs, direction) // assert the order is preserved. var res *logproto.QueryResponse require.NoError(t, - sendBatches(ctx, heapItr, + sendBatches(ctx, it, fakeQueryServer( func(qr *logproto.QueryResponse) error { res = qr @@ -578,7 +576,7 @@ func Test_ChunkFilter(t *testing.T) { } // prepare iterators. - itrs, err := instance.Query(ctx, + it, err := instance.Query(ctx, logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ Selector: `{job="3"}`, @@ -590,7 +588,6 @@ func Test_ChunkFilter(t *testing.T) { }, ) require.NoError(t, err) - it := iter.NewHeapIterator(ctx, itrs, direction) defer it.Close() for it.Next() { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 55a881f568..87f1d13bfd 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -470,7 +470,7 @@ func (s *stream) Iterator(ctx context.Context, statsCtx *stats.Context, from, th if ordered { return iter.NewNonOverlappingIterator(iterators, ""), nil } - return iter.NewHeapIterator(ctx, iterators, direction), nil + return iter.NewSortEntryIterator(iterators, direction), nil } // Returns an SampleIterator. @@ -507,7 +507,7 @@ func (s *stream) SampleIterator(ctx context.Context, statsCtx *stats.Context, fr if ordered { return iter.NewNonOverlappingSampleIterator(iterators, ""), nil } - return iter.NewHeapSampleIterator(ctx, iterators), nil + return iter.NewSortSampleIterator(iterators), nil } func (s *stream) addTailer(t *tailer) { diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 282b39adc0..3b8ec7d704 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -135,8 +135,8 @@ type HeapIterator interface { Push(EntryIterator) } -// heapIterator iterates over a heap of iterators. -type heapIterator struct { +// mergeEntryIterator iterates over a heap of iterators and merge duplicate entries. +type mergeEntryIterator struct { heap interface { heap.Interface Peek() EntryIterator @@ -145,16 +145,17 @@ type heapIterator struct { prefetched bool stats *stats.Context - tuples []tuple - currEntry logproto.Entry - currLabels string - errs []error + tuples []tuple + currEntry entryWithLabels + errs []error } -// NewHeapIterator returns a new iterator which uses a heap to merge together -// entries for multiple interators. -func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { - result := &heapIterator{is: is, stats: stats.FromContext(ctx)} +// NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any. +// The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator. +// This means using this iterator with a single iterator will result in the same result as the input iterator. +// If you don't need to deduplicate entries, use `NewSortEntryIterator` instead. +func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { + result := &mergeEntryIterator{is: is, stats: stats.FromContext(ctx)} switch direction { case logproto.BACKWARD: result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} @@ -171,7 +172,7 @@ func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto // prefetch iterates over all inner iterators to merge together, calls Next() on // each of them to prefetch the first entry and pushes of them - who are not // empty - to the heap -func (i *heapIterator) prefetch() { +func (i *mergeEntryIterator) prefetch() { if i.prefetched { return } @@ -192,7 +193,7 @@ func (i *heapIterator) prefetch() { // // If the iterator has no more entries or an error occur while advancing it, the iterator // is not pushed to the heap and any possible error captured, so that can be get via Error(). -func (i *heapIterator) requeue(ei EntryIterator, advanced bool) { +func (i *mergeEntryIterator) requeue(ei EntryIterator, advanced bool) { if advanced || ei.Next() { heap.Push(i.heap, ei) return @@ -204,7 +205,7 @@ func (i *heapIterator) requeue(ei EntryIterator, advanced bool) { util.LogError("closing iterator", ei.Close) } -func (i *heapIterator) Push(ei EntryIterator) { +func (i *mergeEntryIterator) Push(ei EntryIterator) { i.requeue(ei, false) } @@ -213,7 +214,7 @@ type tuple struct { EntryIterator } -func (i *heapIterator) Next() bool { +func (i *mergeEntryIterator) Next() bool { i.prefetch() if i.heap.Len() == 0 { @@ -222,8 +223,8 @@ func (i *heapIterator) Next() bool { // shortcut for the last iterator. if i.heap.Len() == 1 { - i.currEntry = i.heap.Peek().Entry() - i.currLabels = i.heap.Peek().Labels() + i.currEntry.entry = i.heap.Peek().Entry() + i.currEntry.labels = i.heap.Peek().Labels() if !i.heap.Peek().Next() { i.heap.Pop() } @@ -250,8 +251,8 @@ func (i *heapIterator) Next() bool { // shortcut if we have a single tuple. if len(i.tuples) == 1 { - i.currEntry = i.tuples[0].Entry - i.currLabels = i.tuples[0].Labels() + i.currEntry.entry = i.tuples[0].Entry + i.currEntry.labels = i.tuples[0].Labels() i.requeue(i.tuples[0].EntryIterator, false) i.tuples = i.tuples[:0] return true @@ -260,12 +261,12 @@ func (i *heapIterator) Next() bool { // Find in tuples which entry occurs most often which, due to quorum based // replication, is guaranteed to be the correct next entry. t := i.tuples[0] - i.currEntry = t.Entry - i.currLabels = t.Labels() + i.currEntry.entry = t.Entry + i.currEntry.labels = t.Labels() // Requeue the iterators, advancing them if they were consumed. for j := range i.tuples { - if i.tuples[j].Line != i.currEntry.Line { + if i.tuples[j].Line != i.currEntry.entry.Line { i.requeue(i.tuples[j].EntryIterator, true) continue } @@ -279,15 +280,15 @@ func (i *heapIterator) Next() bool { return true } -func (i *heapIterator) Entry() logproto.Entry { - return i.currEntry +func (i *mergeEntryIterator) Entry() logproto.Entry { + return i.currEntry.entry } -func (i *heapIterator) Labels() string { - return i.currLabels +func (i *mergeEntryIterator) Labels() string { + return i.currEntry.labels } -func (i *heapIterator) Error() error { +func (i *mergeEntryIterator) Error() error { switch len(i.errs) { case 0: return nil @@ -298,7 +299,7 @@ func (i *heapIterator) Error() error { } } -func (i *heapIterator) Close() error { +func (i *mergeEntryIterator) Close() error { for i.heap.Len() > 0 { if err := i.heap.Pop().(EntryIterator).Close(); err != nil { return err @@ -308,35 +309,143 @@ func (i *heapIterator) Close() error { return nil } -func (i *heapIterator) Peek() time.Time { +func (i *mergeEntryIterator) Peek() time.Time { i.prefetch() return i.heap.Peek().Entry().Timestamp } // Len returns the number of inner iterators on the heap, still having entries -func (i *heapIterator) Len() int { +func (i *mergeEntryIterator) Len() int { i.prefetch() return i.heap.Len() } +type entrySortIterator struct { + heap interface { + heap.Interface + Peek() EntryIterator + } + is []EntryIterator + prefetched bool + + currEntry entryWithLabels + errs []error +} + +// NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators. +// The iterator only order entries across given `is` iterators, it does not sort entries within individual iterator. +// This means using this iterator with a single iterator will result in the same result as the input iterator. +func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) EntryIterator { + if len(is) == 0 { + return NoopIterator + } + if len(is) == 1 { + return is[0] + } + result := &entrySortIterator{is: is} + switch direction { + case logproto.BACKWARD: + result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} + case logproto.FORWARD: + result.heap = &iteratorMinHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} + default: + panic("bad direction") + } + return result +} + +// init initialize the underlaying heap +func (i *entrySortIterator) init() { + if i.prefetched { + return + } + + i.prefetched = true + for _, it := range i.is { + if it.Next() { + i.heap.Push(it) + continue + } + + if err := it.Error(); err != nil { + i.errs = append(i.errs, err) + } + util.LogError("closing iterator", it.Close) + } + heap.Init(i.heap) + + // We can now clear the list of input iterators to merge, given they have all + // been processed and the non empty ones have been pushed to the heap + i.is = nil +} + +func (i *entrySortIterator) Next() bool { + i.init() + + if i.heap.Len() == 0 { + return false + } + + next := i.heap.Peek() + i.currEntry.entry = next.Entry() + i.currEntry.labels = next.Labels() + // if the top iterator is empty, we remove it. + if !next.Next() { + heap.Pop(i.heap) + if err := next.Error(); err != nil { + i.errs = append(i.errs, err) + } + util.LogError("closing iterator", next.Close) + return true + } + if i.heap.Len() > 1 { + heap.Fix(i.heap, 0) + } + return true +} + +func (i *entrySortIterator) Entry() logproto.Entry { + return i.currEntry.entry +} + +func (i *entrySortIterator) Labels() string { + return i.currEntry.labels +} + +func (i *entrySortIterator) Error() error { + switch len(i.errs) { + case 0: + return nil + case 1: + return i.errs[0] + default: + return util.MultiError(i.errs) + } +} + +func (i *entrySortIterator) Close() error { + for i.heap.Len() > 0 { + if err := i.heap.Pop().(EntryIterator).Close(); err != nil { + return err + } + } + return nil +} + // NewStreamsIterator returns an iterator over logproto.Stream -func NewStreamsIterator(ctx context.Context, streams []logproto.Stream, direction logproto.Direction) EntryIterator { +func NewStreamsIterator(streams []logproto.Stream, direction logproto.Direction) EntryIterator { is := make([]EntryIterator, 0, len(streams)) for i := range streams { is = append(is, NewStreamIterator(streams[i])) } - return NewHeapIterator(ctx, is, direction) + return NewSortEntryIterator(is, direction) } // NewQueryResponseIterator returns an iterator over a QueryResponse. -func NewQueryResponseIterator(ctx context.Context, resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator { - is := make([]EntryIterator, 0, len(resp.Streams)) - for i := range resp.Streams { - is = append(is, NewStreamIterator(resp.Streams[i])) - } - return NewHeapIterator(ctx, is, direction) +func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator { + return NewStreamsIterator(resp.Streams, direction) } type queryClientIterator struct { @@ -365,7 +474,7 @@ func (i *queryClientIterator) Next() bool { return false } stats.JoinIngesters(ctx, batch.Stats) - i.curr = NewQueryResponseIterator(ctx, batch, i.direction) + i.curr = NewQueryResponseIterator(batch, i.direction) } return true diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index 459c39d6b3..3a563432d1 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -45,7 +45,7 @@ func TestIterator(t *testing.T) { // Test dedupe of overlapping iterators with the heap iterator. { - iterator: NewHeapIterator(context.Background(), []EntryIterator{ + iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{ mkStreamIterator(offset(0, identity), defaultLabels), mkStreamIterator(offset(testSize/2, identity), defaultLabels), mkStreamIterator(offset(testSize, identity), defaultLabels), @@ -57,7 +57,7 @@ func TestIterator(t *testing.T) { // Test dedupe of overlapping iterators with the heap iterator (backward). { - iterator: NewHeapIterator(context.Background(), []EntryIterator{ + iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{ mkStreamIterator(inverse(offset(0, identity)), defaultLabels), mkStreamIterator(inverse(offset(-testSize/2, identity)), defaultLabels), mkStreamIterator(inverse(offset(-testSize, identity)), defaultLabels), @@ -69,7 +69,7 @@ func TestIterator(t *testing.T) { // Test dedupe of entries with the same timestamp but different entries. { - iterator: NewHeapIterator(context.Background(), []EntryIterator{ + iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{ mkStreamIterator(offset(0, constant(0)), defaultLabels), mkStreamIterator(offset(0, constant(0)), defaultLabels), mkStreamIterator(offset(testSize, constant(0)), defaultLabels), @@ -110,7 +110,7 @@ func TestIteratorMultipleLabels(t *testing.T) { }{ // Test merging with differing labels but same timestamps and values. { - iterator: NewHeapIterator(context.Background(), []EntryIterator{ + iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{ mkStreamIterator(identity, "{foobar: \"baz1\"}"), mkStreamIterator(identity, "{foobar: \"baz2\"}"), }, logproto.FORWARD), @@ -128,7 +128,7 @@ func TestIteratorMultipleLabels(t *testing.T) { // Test merging with differing labels but all the same timestamps and different values. { - iterator: NewHeapIterator(context.Background(), []EntryIterator{ + iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{ mkStreamIterator(constant(0), "{foobar: \"baz1\"}"), mkStreamIterator(constant(0), "{foobar: \"baz2\"}"), }, logproto.FORWARD), @@ -158,7 +158,7 @@ func TestIteratorMultipleLabels(t *testing.T) { } } -func TestHeapIteratorPrefetch(t *testing.T) { +func TestMergeIteratorPrefetch(t *testing.T) { t.Parallel() type tester func(t *testing.T, i HeapIterator) @@ -182,7 +182,7 @@ func TestHeapIteratorPrefetch(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - i := NewHeapIterator(context.Background(), []EntryIterator{ + i := NewMergeEntryIterator(context.Background(), []EntryIterator{ mkStreamIterator(identity, "{foobar: \"baz1\"}"), mkStreamIterator(identity, "{foobar: \"baz2\"}"), }, logproto.FORWARD) @@ -234,7 +234,7 @@ func inverse(g generator) generator { } } -func TestHeapIteratorDeduplication(t *testing.T) { +func TestMergeIteratorDeduplication(t *testing.T) { foo := logproto.Stream{ Labels: `{app="foo"}`, Entries: []logproto.Entry{ @@ -272,7 +272,7 @@ func TestHeapIteratorDeduplication(t *testing.T) { require.NoError(t, it.Error()) } // forward iteration - it := NewHeapIterator(context.Background(), []EntryIterator{ + it := NewMergeEntryIterator(context.Background(), []EntryIterator{ NewStreamIterator(foo), NewStreamIterator(bar), NewStreamIterator(foo), @@ -284,7 +284,7 @@ func TestHeapIteratorDeduplication(t *testing.T) { assertIt(it, false, len(foo.Entries)) // backward iteration - it = NewHeapIterator(context.Background(), []EntryIterator{ + it = NewMergeEntryIterator(context.Background(), []EntryIterator{ mustReverseStreamIterator(NewStreamIterator(foo)), mustReverseStreamIterator(NewStreamIterator(bar)), mustReverseStreamIterator(NewStreamIterator(foo)), @@ -308,8 +308,8 @@ func TestReverseIterator(t *testing.T) { itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels) itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}") - heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) - reversedIter, err := NewReversedIter(heapIterator, testSize, false) + mergeIterator := NewMergeEntryIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) + reversedIter, err := NewReversedIter(mergeIterator, testSize, false) require.NoError(t, err) for i := int64((testSize / 2) + 1); i <= testSize; i++ { @@ -347,8 +347,8 @@ func TestReverseEntryIteratorUnlimited(t *testing.T) { itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels) itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}") - heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) - reversedIter, err := NewReversedIter(heapIterator, 0, false) + mergeIterator := NewMergeEntryIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) + reversedIter, err := NewReversedIter(mergeIterator, 0, false) require.NoError(t, err) var ct int @@ -546,7 +546,7 @@ func Test_DuplicateCount(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { _, ctx := stats.NewContext(context.Background()) - it := NewHeapIterator(ctx, test.iters, test.direction) + it := NewMergeEntryIterator(ctx, test.iters, test.direction) defer it.Close() for it.Next() { } @@ -636,7 +636,7 @@ func TestNonOverlappingClose(t *testing.T) { require.Equal(t, true, b.closed.Load()) } -func BenchmarkHeapIterator(b *testing.B) { +func BenchmarkSortIterator(b *testing.B) { var ( ctx = context.Background() streams []logproto.Stream @@ -658,18 +658,130 @@ func BenchmarkHeapIterator(b *testing.B) { streams[i], streams[j] = streams[j], streams[i] }) - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - var itrs []EntryIterator - for i := 0; i < streamsCount; i++ { - itrs = append(itrs, NewStreamIterator(streams[i])) + b.Run("merge sort", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + var itrs []EntryIterator + for i := 0; i < streamsCount; i++ { + itrs = append(itrs, NewStreamIterator(streams[i])) + } + b.StartTimer() + it := NewMergeEntryIterator(ctx, itrs, logproto.BACKWARD) + for it.Next() { + it.Entry() + } + it.Close() } - b.StartTimer() - it := NewHeapIterator(ctx, itrs, logproto.BACKWARD) + }) + + b.Run("sort", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + var itrs []EntryIterator + for i := 0; i < streamsCount; i++ { + itrs = append(itrs, NewStreamIterator(streams[i])) + } + b.StartTimer() + it := NewSortEntryIterator(itrs, logproto.BACKWARD) + for it.Next() { + it.Entry() + } + it.Close() + } + }) +} + +func Test_EntrySortIterator(t *testing.T) { + t.Run("backward", func(t *testing.T) { + t.Parallel() + it := NewSortEntryIterator( + []EntryIterator{ + NewStreamIterator(logproto.Stream{ + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 5)}, + {Timestamp: time.Unix(0, 3)}, + {Timestamp: time.Unix(0, 0)}, + }, + Labels: `{foo="bar"}`, + }), + NewStreamIterator(logproto.Stream{ + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 4)}, + {Timestamp: time.Unix(0, 2)}, + {Timestamp: time.Unix(0, 1)}, + }, + Labels: `{foo="buzz"}`, + }), + }, logproto.BACKWARD) + var i int64 = 5 + defer it.Close() for it.Next() { - it.Entry() + require.Equal(t, time.Unix(0, i), it.Entry().Timestamp) + i-- } - it.Close() - } + }) + t.Run("forward", func(t *testing.T) { + t.Parallel() + it := NewSortEntryIterator( + []EntryIterator{ + NewStreamIterator(logproto.Stream{ + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 0)}, + {Timestamp: time.Unix(0, 3)}, + {Timestamp: time.Unix(0, 5)}, + }, + Labels: `{foo="bar"}`, + }), + NewStreamIterator(logproto.Stream{ + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1)}, + {Timestamp: time.Unix(0, 2)}, + {Timestamp: time.Unix(0, 4)}, + }, + Labels: `{foo="buzz"}`, + }), + }, logproto.FORWARD) + var i int64 + defer it.Close() + for it.Next() { + require.Equal(t, time.Unix(0, i), it.Entry().Timestamp) + i++ + } + }) + t.Run("forward sort by stream", func(t *testing.T) { + t.Parallel() + it := NewSortEntryIterator( + []EntryIterator{ + NewStreamIterator(logproto.Stream{ + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 0)}, + {Timestamp: time.Unix(0, 3)}, + {Timestamp: time.Unix(0, 5)}, + }, + Labels: `b`, + }), + NewStreamIterator(logproto.Stream{ + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 0)}, + {Timestamp: time.Unix(0, 1)}, + {Timestamp: time.Unix(0, 2)}, + {Timestamp: time.Unix(0, 4)}, + }, + Labels: `a`, + }), + }, logproto.FORWARD) + // The first entry appears in both so we expect it to be sorted by Labels. + require.True(t, it.Next()) + require.Equal(t, time.Unix(0, 0), it.Entry().Timestamp) + require.Equal(t, `a`, it.Labels()) + + var i int64 + defer it.Close() + for it.Next() { + require.Equal(t, time.Unix(0, i), it.Entry().Timestamp) + i++ + } + }) } diff --git a/pkg/iter/sample_iterator.go b/pkg/iter/sample_iterator.go index 6d25610bbe..70145c23ac 100644 --- a/pkg/iter/sample_iterator.go +++ b/pkg/iter/sample_iterator.go @@ -138,24 +138,25 @@ func (h sampleIteratorHeap) Less(i, j int) bool { } } -// heapSampleIterator iterates over a heap of iterators. -type heapSampleIterator struct { +// mergeSampleIterator iterates over a heap of iterators by merging samples. +type mergeSampleIterator struct { heap *sampleIteratorHeap is []SampleIterator prefetched bool stats *stats.Context - tuples []sampletuple - curr logproto.Sample - currLabels string - errs []error + tuples []sampletuple + curr sampleWithLabels + errs []error } -// NewHeapSampleIterator returns a new iterator which uses a heap to merge together -// entries for multiple iterators. -func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator { +// NewMergeSampleIterator returns a new iterator which uses a heap to merge together samples for multiple iterators and deduplicate if any. +// The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator. +// This means using this iterator with a single iterator will result in the same result as the input iterator. +// If you don't need to deduplicate sample, use `NewSortSampleIterator` instead. +func NewMergeSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator { h := sampleIteratorHeap(make([]SampleIterator, 0, len(is))) - return &heapSampleIterator{ + return &mergeSampleIterator{ stats: stats.FromContext(ctx), is: is, heap: &h, @@ -166,7 +167,7 @@ func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleItera // prefetch iterates over all inner iterators to merge together, calls Next() on // each of them to prefetch the first entry and pushes of them - who are not // empty - to the heap -func (i *heapSampleIterator) prefetch() { +func (i *mergeSampleIterator) prefetch() { if i.prefetched { return } @@ -187,7 +188,7 @@ func (i *heapSampleIterator) prefetch() { // // If the iterator has no more entries or an error occur while advancing it, the iterator // is not pushed to the heap and any possible error captured, so that can be get via Error(). -func (i *heapSampleIterator) requeue(ei SampleIterator, advanced bool) { +func (i *mergeSampleIterator) requeue(ei SampleIterator, advanced bool) { if advanced || ei.Next() { heap.Push(i.heap, ei) return @@ -204,7 +205,7 @@ type sampletuple struct { SampleIterator } -func (i *heapSampleIterator) Next() bool { +func (i *mergeSampleIterator) Next() bool { i.prefetch() if i.heap.Len() == 0 { @@ -213,8 +214,8 @@ func (i *heapSampleIterator) Next() bool { // shortcut for the last iterator. if i.heap.Len() == 1 { - i.curr = i.heap.Peek().Sample() - i.currLabels = i.heap.Peek().Labels() + i.curr.Sample = i.heap.Peek().Sample() + i.curr.labels = i.heap.Peek().Labels() if !i.heap.Peek().Next() { i.heap.Pop() } @@ -239,8 +240,8 @@ func (i *heapSampleIterator) Next() bool { }) } - i.curr = i.tuples[0].Sample - i.currLabels = i.tuples[0].Labels() + i.curr.Sample = i.tuples[0].Sample + i.curr.labels = i.tuples[0].Labels() t := i.tuples[0] if len(i.tuples) == 1 { i.requeue(i.tuples[0].SampleIterator, false) @@ -263,15 +264,15 @@ func (i *heapSampleIterator) Next() bool { return true } -func (i *heapSampleIterator) Sample() logproto.Sample { - return i.curr +func (i *mergeSampleIterator) Sample() logproto.Sample { + return i.curr.Sample } -func (i *heapSampleIterator) Labels() string { - return i.currLabels +func (i *mergeSampleIterator) Labels() string { + return i.curr.labels } -func (i *heapSampleIterator) Error() error { +func (i *mergeSampleIterator) Error() error { switch len(i.errs) { case 0: return nil @@ -282,7 +283,7 @@ func (i *heapSampleIterator) Error() error { } } -func (i *heapSampleIterator) Close() error { +func (i *mergeSampleIterator) Close() error { for i.heap.Len() > 0 { if err := i.heap.Pop().(SampleIterator).Close(); err != nil { return err @@ -292,6 +293,111 @@ func (i *heapSampleIterator) Close() error { return nil } +// sortSampleIterator iterates over a heap of iterators by sorting samples. +type sortSampleIterator struct { + heap *sampleIteratorHeap + is []SampleIterator + prefetched bool + + curr sampleWithLabels + errs []error +} + +// NewSortSampleIterator returns a new SampleIterator that sorts samples by ascending timestamp the input iterators. +// The iterator only order sample across given `is` iterators, it does not sort samples within individual iterator. +// This means using this iterator with a single iterator will result in the same result as the input iterator. +func NewSortSampleIterator(is []SampleIterator) SampleIterator { + if len(is) == 0 { + return NoopIterator + } + if len(is) == 1 { + return is[0] + } + h := sampleIteratorHeap(make([]SampleIterator, 0, len(is))) + return &sortSampleIterator{ + is: is, + heap: &h, + } +} + +// init initialize the underlaying heap +func (i *sortSampleIterator) init() { + if i.prefetched { + return + } + + i.prefetched = true + for _, it := range i.is { + if it.Next() { + i.heap.Push(it) + continue + } + + if err := it.Error(); err != nil { + i.errs = append(i.errs, err) + } + util.LogError("closing iterator", it.Close) + } + heap.Init(i.heap) + + // We can now clear the list of input iterators to merge, given they have all + // been processed and the non empty ones have been pushed to the heap + i.is = nil +} + +func (i *sortSampleIterator) Next() bool { + i.init() + + if i.heap.Len() == 0 { + return false + } + + next := i.heap.Peek() + i.curr.Sample = next.Sample() + i.curr.labels = next.Labels() + // if the top iterator is empty, we remove it. + if !next.Next() { + heap.Pop(i.heap) + if err := next.Error(); err != nil { + i.errs = append(i.errs, err) + } + util.LogError("closing iterator", next.Close) + return true + } + if i.heap.Len() > 1 { + heap.Fix(i.heap, 0) + } + return true +} + +func (i *sortSampleIterator) Sample() logproto.Sample { + return i.curr.Sample +} + +func (i *sortSampleIterator) Labels() string { + return i.curr.labels +} + +func (i *sortSampleIterator) Error() error { + switch len(i.errs) { + case 0: + return nil + case 1: + return i.errs[0] + default: + return util.MultiError(i.errs) + } +} + +func (i *sortSampleIterator) Close() error { + for i.heap.Len() > 0 { + if err := i.heap.Pop().(SampleIterator).Close(); err != nil { + return err + } + } + return nil +} + type sampleQueryClientIterator struct { client QuerySampleClient err error @@ -323,7 +429,7 @@ func (i *sampleQueryClientIterator) Next() bool { return false } stats.JoinIngesters(ctx, batch.Stats) - i.curr = NewSampleQueryResponseIterator(ctx, batch) + i.curr = NewSampleQueryResponseIterator(batch) } return true } @@ -345,8 +451,8 @@ func (i *sampleQueryClientIterator) Close() error { } // NewSampleQueryResponseIterator returns an iterator over a SampleQueryResponse. -func NewSampleQueryResponseIterator(ctx context.Context, resp *logproto.SampleQueryResponse) SampleIterator { - return NewMultiSeriesIterator(ctx, resp.Series) +func NewSampleQueryResponseIterator(resp *logproto.SampleQueryResponse) SampleIterator { + return NewMultiSeriesIterator(resp.Series) } type seriesIterator struct { @@ -386,12 +492,12 @@ func SampleIteratorWithClose(it SampleIterator, closeFn func() error) SampleIter } // NewMultiSeriesIterator returns an iterator over multiple logproto.Series -func NewMultiSeriesIterator(ctx context.Context, series []logproto.Series) SampleIterator { +func NewMultiSeriesIterator(series []logproto.Series) SampleIterator { is := make([]SampleIterator, 0, len(series)) for i := range series { is = append(is, NewSeriesIterator(series[i])) } - return NewHeapSampleIterator(ctx, is) + return NewSortSampleIterator(is) } // NewSeriesIterator iterates over sample in a series. diff --git a/pkg/iter/sample_iterator_test.go b/pkg/iter/sample_iterator_test.go index ed19919e3f..b1952f6376 100644 --- a/pkg/iter/sample_iterator_test.go +++ b/pkg/iter/sample_iterator_test.go @@ -104,8 +104,8 @@ var carSeries = logproto.Series{ }, } -func TestNewHeapSampleIterator(t *testing.T) { - it := NewHeapSampleIterator(context.Background(), +func TestNewMergeSampleIterator(t *testing.T) { + it := NewMergeSampleIterator(context.Background(), []SampleIterator{ NewSeriesIterator(varSeries), NewSeriesIterator(carSeries), @@ -194,7 +194,7 @@ func TestReadSampleBatch(t *testing.T) { require.Equal(t, uint32(1), size) require.NoError(t, err) - res, size, err = ReadSampleBatch(NewMultiSeriesIterator(context.Background(), []logproto.Series{carSeries, varSeries}), 100) + res, size, err = ReadSampleBatch(NewMultiSeriesIterator([]logproto.Series{carSeries, varSeries}), 100) require.ElementsMatch(t, []logproto.Series{carSeries, varSeries}, res.Series) require.Equal(t, uint32(6), size) require.NoError(t, err) @@ -277,7 +277,7 @@ func TestSampleIteratorWithClose_ReturnsError(t *testing.T) { assert.Equal(t, err, err2) } -func BenchmarkHeapSampleIterator(b *testing.B) { +func BenchmarkSortSampleIterator(b *testing.B) { var ( ctx = context.Background() series []logproto.Series @@ -299,18 +299,102 @@ func BenchmarkHeapSampleIterator(b *testing.B) { series[i], series[j] = series[j], series[i] }) - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - var itrs []SampleIterator - for i := 0; i < seriesCount; i++ { - itrs = append(itrs, NewSeriesIterator(series[i])) + b.Run("merge", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + var itrs []SampleIterator + for i := 0; i < seriesCount; i++ { + itrs = append(itrs, NewSeriesIterator(series[i])) + } + b.StartTimer() + it := NewMergeSampleIterator(ctx, itrs) + for it.Next() { + it.Sample() + } + it.Close() } - b.StartTimer() - it := NewHeapSampleIterator(ctx, itrs) + }) + b.Run("sort", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + var itrs []SampleIterator + for i := 0; i < seriesCount; i++ { + itrs = append(itrs, NewSeriesIterator(series[i])) + } + b.StartTimer() + it := NewSortSampleIterator(itrs) + for it.Next() { + it.Sample() + } + it.Close() + } + }) +} + +func Test_SampleSortIterator(t *testing.T) { + t.Run("forward", func(t *testing.T) { + t.Parallel() + it := NewSortSampleIterator( + []SampleIterator{ + NewSeriesIterator(logproto.Series{ + Samples: []logproto.Sample{ + {Timestamp: 0}, + {Timestamp: 3}, + {Timestamp: 5}, + }, + Labels: `{foo="bar"}`, + }), + NewSeriesIterator(logproto.Series{ + Samples: []logproto.Sample{ + {Timestamp: 1}, + {Timestamp: 2}, + {Timestamp: 4}, + }, + Labels: `{foo="bar"}`, + }), + }) + var i int64 + defer it.Close() for it.Next() { - it.Sample() + require.Equal(t, i, it.Sample().Timestamp) + i++ } - it.Close() - } + }) + t.Run("forward sort by stream", func(t *testing.T) { + t.Parallel() + it := NewSortSampleIterator( + []SampleIterator{ + NewSeriesIterator(logproto.Series{ + Samples: []logproto.Sample{ + {Timestamp: 0}, + {Timestamp: 3}, + {Timestamp: 5}, + }, + Labels: `b`, + }), + NewSeriesIterator(logproto.Series{ + Samples: []logproto.Sample{ + {Timestamp: 0}, + {Timestamp: 1}, + {Timestamp: 2}, + {Timestamp: 4}, + }, + Labels: `a`, + }), + }) + + // The first entry appears in both so we expect it to be sorted by Labels. + require.True(t, it.Next()) + require.Equal(t, int64(0), it.Sample().Timestamp) + require.Equal(t, `a`, it.Labels()) + + var i int64 + defer it.Close() + for it.Next() { + require.Equal(t, i, it.Sample().Timestamp) + i++ + } + }) } diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 8debdf58e1..f8493d78db 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -32,9 +32,7 @@ const ( defaultMaxFileSize = 20 * (1 << 20) // 20MB ) -var ( - ErrNotSupported = errors.New("not supported") -) +var ErrNotSupported = errors.New("not supported") // FileClient is a type of LogCLI client that do LogQL on log lines from // the given file directly, instead get log lines from Loki servers. @@ -63,7 +61,6 @@ func NewFileClient(r io.ReadCloser) *FileClient { labels: []string{defaultLabelKey}, labelValues: []string{defaultLabelValue}, } - } func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { @@ -198,7 +195,7 @@ type querier struct { labels labels.Labels } -func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { +func (q *querier) SelectLogs(_ context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { expr, err := params.LogSelector() if err != nil { return nil, fmt.Errorf("failed to extract selector for logs: %w", err) @@ -207,7 +204,7 @@ func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) if err != nil { return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err) } - return newFileIterator(ctx, q.r, params, pipeline.ForStream(q.labels)) + return newFileIterator(q.r, params, pipeline.ForStream(q.labels)) } func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { @@ -215,7 +212,6 @@ func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa } func newFileIterator( - ctx context.Context, r io.Reader, params logql.SelectLogParams, pipeline logqllog.StreamPipeline, @@ -278,7 +274,6 @@ func newFileIterator( } return iter.NewStreamsIterator( - ctx, streamResult, params.Direction, ), nil diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 544bcac4cd..5b6935af11 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -1340,7 +1340,7 @@ func TestEngine_RangeQuery(t *testing.T) { `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]logproto.Series{ { - newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(15, identity), `{app="fuzz"}`), + newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="fuzz"}`), newSeries(testSize, identity, `{app="buzz"}`), }, }, @@ -2085,11 +2085,11 @@ type errorIteratorQuerier struct { } func (e errorIteratorQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) { - return iter.NewHeapIterator(ctx, e.entries, p.Direction), nil + return iter.NewSortEntryIterator(e.entries, p.Direction), nil } func (e errorIteratorQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) { - return iter.NewHeapSampleIterator(ctx, e.samples), nil + return iter.NewSortSampleIterator(e.samples), nil } func TestStepEvaluator_Error(t *testing.T) { @@ -2269,11 +2269,6 @@ func getLocalQuerier(size int64) Querier { newSeries(size, identity, `{app="bar",bar="foo"}`), newSeries(size, identity, `{app="bar",bar="bazz"}`), newSeries(size, identity, `{app="bar",bar="fuzz"}`), - // some duplicates - newSeries(size, identity, `{app="foo"}`), - newSeries(size, identity, `{app="bar"}`), - newSeries(size, identity, `{app="bar",bar="bazz"}`), - newSeries(size, identity, `{app="bar"}`), }, }, streams: map[string][]logproto.Stream{ @@ -2286,11 +2281,6 @@ func getLocalQuerier(size int64) Querier { newStream(size, identity, `{app="bar",bar="foo"}`), newStream(size, identity, `{app="bar",bar="bazz"}`), newStream(size, identity, `{app="bar",bar="fuzz"}`), - // some duplicates - newStream(size, identity, `{app="foo"}`), - newStream(size, identity, `{app="bar"}`), - newStream(size, identity, `{app="bar",bar="bazz"}`), - newStream(size, identity, `{app="bar"}`), }, }, } @@ -2331,7 +2321,7 @@ func newQuerierRecorder(t *testing.T, data interface{}, params interface{}) *que func (q *querierRecorder) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) { if !q.match { for _, s := range q.streams { - return iter.NewStreamsIterator(ctx, s, p.Direction), nil + return iter.NewStreamsIterator(s, p.Direction), nil } } recordID := paramsID(p) @@ -2339,17 +2329,13 @@ func (q *querierRecorder) SelectLogs(ctx context.Context, p SelectLogParams) (it if !ok { return nil, fmt.Errorf("no streams found for id: %s has: %+v", recordID, q.streams) } - iters := make([]iter.EntryIterator, 0, len(streams)) - for _, s := range streams { - iters = append(iters, iter.NewStreamIterator(s)) - } - return iter.NewHeapIterator(ctx, iters, p.Direction), nil + return iter.NewStreamsIterator(streams, p.Direction), nil } func (q *querierRecorder) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) { if !q.match { for _, s := range q.series { - return iter.NewMultiSeriesIterator(ctx, s), nil + return iter.NewMultiSeriesIterator(s), nil } } recordID := paramsID(p) @@ -2360,11 +2346,7 @@ func (q *querierRecorder) SelectSamples(ctx context.Context, p SelectSampleParam if !ok { return nil, fmt.Errorf("no series found for id: %s has: %+v", recordID, q.series) } - iters := make([]iter.SampleIterator, 0, len(series)) - for _, s := range series { - iters = append(iters, iter.NewSeriesIterator(s)) - } - return iter.NewHeapSampleIterator(ctx, iters), nil + return iter.NewMultiSeriesIterator(series), nil } func paramsID(p interface{}) string { diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index fca777db87..74583781c9 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -34,7 +34,7 @@ var ( ) func newSampleIterator() iter.SampleIterator { - return iter.NewHeapSampleIterator(context.Background(), []iter.SampleIterator{ + return iter.NewSortSampleIterator([]iter.SampleIterator{ iter.NewSeriesIterator(logproto.Series{ Labels: labelFoo.String(), Samples: samples, diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index aca924b9a9..0e4ad4f72f 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -324,7 +324,7 @@ func (ev *DownstreamEvaluator) Iterator( xs = append(xs, iter) } - return iter.NewHeapIterator(ctx, xs, params.Direction()), nil + return iter.NewSortEntryIterator(xs, params.Direction()), nil default: return nil, EvaluatorUnsupportedType(expr, ev) @@ -401,5 +401,5 @@ func ResultIterator(res logqlmodel.Result, params Params) (iter.EntryIterator, e if !ok { return nil, fmt.Errorf("unexpected type (%s) for ResultIterator; expected %s", res.Data.Type(), logqlmodel.ValueTypeStreams) } - return iter.NewStreamsIterator(context.Background(), streams, params.Direction()), nil + return iter.NewStreamsIterator(streams, params.Direction()), nil } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index e5a0e7f57f..6f0cd94f13 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -90,7 +90,7 @@ outer: } } - return iter.NewHeapIterator(ctx, streamIters, req.Direction), nil + return iter.NewSortEntryIterator(streamIters, req.Direction), nil } func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Stream { @@ -200,7 +200,7 @@ outer: filtered := processSeries(matched, extractor) return iter.NewTimeRangedSampleIterator( - iter.NewMultiSeriesIterator(ctx, filtered), + iter.NewMultiSeriesIterator(filtered), req.Start.UnixNano(), req.End.UnixNano()+1, ), nil diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ee066c556e..052c192b51 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -142,8 +142,10 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) iters = append(iters, storeIter) } - - return iter.NewHeapIterator(ctx, iters, params.Direction), nil + if len(iters) == 1 { + return iters[0], nil + } + return iter.NewMergeEntryIterator(ctx, iters, params.Direction), nil } func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { @@ -185,7 +187,7 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa iters = append(iters, storeIter) } - return iter.NewHeapSampleIterator(ctx, iters), nil + return iter.NewMergeSampleIterator(ctx, iters), nil } func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 93938c44ec..a6454c0535 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -272,7 +272,7 @@ func newTailer( waitEntryThrottle time.Duration, ) *Tailer { t := Tailer{ - openStreamIterator: iter.NewHeapIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD), + openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD), querierTailClients: querierTailClients, delayFor: delayFor, responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses), diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index e3778ba9db..ddc8f5fa80 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -396,8 +396,10 @@ func (it *logBatchIterator) newChunksIterator(b *chunkBatch) (iter.EntryIterator if err != nil { return nil, err } - - return iter.NewHeapIterator(it.ctx, iters, it.direction), nil + if len(iters) == 1 { + return iters[0], nil + } + return iter.NewSortEntryIterator(iters, it.direction), nil } func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) { @@ -440,7 +442,7 @@ func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through result = append(result, iter.NewNonOverlappingIterator(iterators, "")) } - return iter.NewHeapIterator(it.ctx, result, it.direction), nil + return iter.NewMergeEntryIterator(it.ctx, result, it.direction), nil } type sampleBatchIterator struct { @@ -537,7 +539,7 @@ func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIter return nil, err } - return iter.NewHeapSampleIterator(it.ctx, iters), nil + return iter.NewSortSampleIterator(iters), nil } func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.SampleIterator, error) { @@ -574,7 +576,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro result = append(result, iter.NewNonOverlappingSampleIterator(iterators, "")) } - return iter.NewHeapSampleIterator(it.ctx, result), nil + return iter.NewMergeSampleIterator(it.ctx, result), nil } func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels.Matcher {