diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 58e0ab929e..60bf032cab 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -53,16 +53,17 @@ func (i *streamIterator) Close() error { return nil } -// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len -// Not safe for concurrent use -type HeapIterator interface { +// MergeEntryIterator exposes additional fields that are used by the Tailer only. +// Not safe for concurrent use! +type MergeEntryIterator interface { EntryIterator + Peek() time.Time IsEmpty() bool Push(EntryIterator) } -// mergeEntryIterator iterates over a heap of iterators and merge duplicate entries. +// mergeEntryIterator implements the MergeEntryIterator interface functions. type mergeEntryIterator struct { tree *loser.Tree[sortFields, EntryIterator] stats *stats.Context @@ -74,11 +75,11 @@ type mergeEntryIterator struct { errs []error } -// NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any. +// NewMergeEntryIterator returns a new iterator which uses a looser tree to merge together entries for multiple iterators and deduplicate entries if any. // The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator. // This means using this iterator with a single iterator will result in the same result as the input iterator. // If you don't need to deduplicate entries, use `NewSortEntryIterator` instead. -func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { +func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) MergeEntryIterator { maxVal, less := treeLess(direction) result := &mergeEntryIterator{stats: stats.FromContext(ctx)} result.tree = loser.New(is, maxVal, sortFieldsAt, less, result.closeEntry) diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index e49ecf3ee5..fb1548ddc3 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -162,16 +162,16 @@ func TestIteratorMultipleLabels(t *testing.T) { func TestMergeIteratorPrefetch(t *testing.T) { t.Parallel() - type tester func(t *testing.T, i HeapIterator) + type tester func(t *testing.T, i MergeEntryIterator) tests := map[string]tester{ - "prefetch on IsEmpty() when called as first method": func(t *testing.T, i HeapIterator) { + "prefetch on IsEmpty() when called as first method": func(t *testing.T, i MergeEntryIterator) { assert.Equal(t, false, i.IsEmpty()) }, - "prefetch on Peek() when called as first method": func(t *testing.T, i HeapIterator) { + "prefetch on Peek() when called as first method": func(t *testing.T, i MergeEntryIterator) { assert.Equal(t, time.Unix(0, 0), i.Peek()) }, - "prefetch on Next() when called as first method": func(t *testing.T, i HeapIterator) { + "prefetch on Next() when called as first method": func(t *testing.T, i MergeEntryIterator) { assert.True(t, i.Next()) assert.Equal(t, logproto.Entry{Timestamp: time.Unix(0, 0), Line: "0"}, i.At()) }, diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 87dba6bae0..0d9495daf6 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -38,7 +38,7 @@ const ( // Tailer manages complete lifecycle of a tail request type Tailer struct { // openStreamIterator is for streams already open - openStreamIterator iter.HeapIterator + openStreamIterator iter.MergeEntryIterator streamMtx sync.Mutex // for synchronizing access to openStreamIterator currEntry logproto.Entry diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 46f708d091..739a27f9b2 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -421,7 +421,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC for _, chunks := range chks { if len(chunks) != 0 && len(chunks[0]) != 0 { streamPipeline := it.pipeline.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels()) - iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk) + iterator, err := it.buildMergeIterator(chunks, from, through, streamPipeline, nextChunk) if err != nil { return nil, err } @@ -433,7 +433,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC return result, nil } -func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) { +func (it *logBatchIterator) buildMergeIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for i := range chks { diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 0159c20a19..34d8e35004 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1649,9 +1649,9 @@ func TestBuildHeapIterator(t *testing.T) { ctx: ctx, pipeline: log.NewNoopPipeline(), } - it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) + it, err := b.buildMergeIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) if err != nil { - t.Errorf("buildHeapIterator error = %v", err) + t.Errorf("buildMergeIterator error = %v", err) return } req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil)