chore: Rename `HeapIterator` to `MergeEntryIterator` (#13975)

Stumbled across this iterator interface and was confused about its name. It leaked its implementation details, and additionally that was not even correct any more, since the implementation has changed.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/13950/head
Christian Haudum 1 year ago committed by GitHub
parent 7c1a8493b3
commit 8afdfd500c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      pkg/iter/entry_iterator.go
  2. 8
      pkg/iter/entry_iterator_test.go
  3. 2
      pkg/querier/tail.go
  4. 4
      pkg/storage/batch.go
  5. 4
      pkg/storage/batch_test.go

@ -53,16 +53,17 @@ func (i *streamIterator) Close() error {
return nil
}
// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len
// Not safe for concurrent use
type HeapIterator interface {
// MergeEntryIterator exposes additional fields that are used by the Tailer only.
// Not safe for concurrent use!
type MergeEntryIterator interface {
EntryIterator
Peek() time.Time
IsEmpty() bool
Push(EntryIterator)
}
// mergeEntryIterator iterates over a heap of iterators and merge duplicate entries.
// mergeEntryIterator implements the MergeEntryIterator interface functions.
type mergeEntryIterator struct {
tree *loser.Tree[sortFields, EntryIterator]
stats *stats.Context
@ -74,11 +75,11 @@ type mergeEntryIterator struct {
errs []error
}
// NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any.
// NewMergeEntryIterator returns a new iterator which uses a looser tree to merge together entries for multiple iterators and deduplicate entries if any.
// The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator.
// This means using this iterator with a single iterator will result in the same result as the input iterator.
// If you don't need to deduplicate entries, use `NewSortEntryIterator` instead.
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) MergeEntryIterator {
maxVal, less := treeLess(direction)
result := &mergeEntryIterator{stats: stats.FromContext(ctx)}
result.tree = loser.New(is, maxVal, sortFieldsAt, less, result.closeEntry)

@ -162,16 +162,16 @@ func TestIteratorMultipleLabels(t *testing.T) {
func TestMergeIteratorPrefetch(t *testing.T) {
t.Parallel()
type tester func(t *testing.T, i HeapIterator)
type tester func(t *testing.T, i MergeEntryIterator)
tests := map[string]tester{
"prefetch on IsEmpty() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on IsEmpty() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.Equal(t, false, i.IsEmpty())
},
"prefetch on Peek() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on Peek() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.Equal(t, time.Unix(0, 0), i.Peek())
},
"prefetch on Next() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on Next() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.True(t, i.Next())
assert.Equal(t, logproto.Entry{Timestamp: time.Unix(0, 0), Line: "0"}, i.At())
},

@ -38,7 +38,7 @@ const (
// Tailer manages complete lifecycle of a tail request
type Tailer struct {
// openStreamIterator is for streams already open
openStreamIterator iter.HeapIterator
openStreamIterator iter.MergeEntryIterator
streamMtx sync.Mutex // for synchronizing access to openStreamIterator
currEntry logproto.Entry

@ -421,7 +421,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamPipeline := it.pipeline.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels())
iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk)
iterator, err := it.buildMergeIterator(chunks, from, through, streamPipeline, nextChunk)
if err != nil {
return nil, err
}
@ -433,7 +433,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC
return result, nil
}
func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) {
func (it *logBatchIterator) buildMergeIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
for i := range chks {

@ -1649,9 +1649,9 @@ func TestBuildHeapIterator(t *testing.T) {
ctx: ctx,
pipeline: log.NewNoopPipeline(),
}
it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
it, err := b.buildMergeIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
if err != nil {
t.Errorf("buildHeapIterator error = %v", err)
t.Errorf("buildMergeIterator error = %v", err)
return
}
req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil)

Loading…
Cancel
Save