Refactor HeapIterator into Merge and Sort Iterator. (#5281)

* Refactor HeapIterator into merge and sort Iterator.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/5266/head^2
Cyril Tovena 4 years ago committed by GitHub
parent 44ba39055d
commit 3af7b7900e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      pkg/chunkenc/memchunk.go
  2. 4
      pkg/chunkenc/unordered.go
  3. 23
      pkg/ingester/ingester.go
  4. 8
      pkg/ingester/instance.go
  5. 9
      pkg/ingester/instance_test.go
  6. 4
      pkg/ingester/stream.go
  7. 185
      pkg/iter/entry_iterator.go
  8. 166
      pkg/iter/entry_iterator_test.go
  9. 162
      pkg/iter/sample_iterator.go
  10. 114
      pkg/iter/sample_iterator_test.go
  11. 11
      pkg/logcli/client/file.go
  12. 32
      pkg/logql/engine_test.go
  13. 2
      pkg/logql/range_vector_test.go
  14. 4
      pkg/logql/sharding.go
  15. 4
      pkg/logql/test_utils.go
  16. 8
      pkg/querier/querier.go
  17. 2
      pkg/querier/tail.go
  18. 12
      pkg/storage/batch.go

@ -818,7 +818,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if ordered { if ordered {
it = iter.NewNonOverlappingIterator(blockItrs, "") it = iter.NewNonOverlappingIterator(blockItrs, "")
} else { } else {
it = iter.NewHeapIterator(ctx, blockItrs, direction) it = iter.NewSortEntryIterator(blockItrs, direction)
} }
return iter.NewTimeRangedIterator( return iter.NewTimeRangedIterator(
@ -851,7 +851,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if ordered { if ordered {
return iter.NewNonOverlappingIterator(blockItrs, ""), nil return iter.NewNonOverlappingIterator(blockItrs, ""), nil
} }
return iter.NewHeapIterator(ctx, blockItrs, direction), nil return iter.NewSortEntryIterator(blockItrs, direction), nil
} }
// Iterator implements Chunk. // Iterator implements Chunk.
@ -886,7 +886,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
if ordered { if ordered {
it = iter.NewNonOverlappingSampleIterator(its, "") it = iter.NewNonOverlappingSampleIterator(its, "")
} else { } else {
it = iter.NewHeapSampleIterator(ctx, its) it = iter.NewSortSampleIterator(its)
} }
return iter.NewTimeRangedSampleIterator( return iter.NewTimeRangedSampleIterator(
@ -1041,7 +1041,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
for _, stream := range streams { for _, stream := range streams {
streamsResult = append(streamsResult, *stream) streamsResult = append(streamsResult, *stream)
} }
return iter.NewStreamsIterator(ctx, streamsResult, direction) return iter.NewStreamsIterator(streamsResult, direction)
} }
func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator { func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
@ -1082,7 +1082,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra
for _, s := range series { for _, s := range series {
seriesRes = append(seriesRes, *s) seriesRes = append(seriesRes, *s)
} }
return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error { return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(seriesRes), func() error {
for _, s := range series { for _, s := range series {
SamplesPool.Put(s.Samples) SamplesPool.Put(s.Samples)
} }

@ -257,7 +257,7 @@ func (hb *unorderedHeadBlock) Iterator(
for _, stream := range streams { for _, stream := range streams {
streamsResult = append(streamsResult, *stream) streamsResult = append(streamsResult, *stream)
} }
return iter.NewStreamsIterator(ctx, streamsResult, direction) return iter.NewStreamsIterator(streamsResult, direction)
} }
// nolint:unused // nolint:unused
@ -308,7 +308,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
for _, s := range series { for _, s := range series {
seriesRes = append(seriesRes, *s) seriesRes = append(seriesRes, *s)
} }
return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error { return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(seriesRes), func() error {
for _, s := range series { for _, s := range series {
SamplesPool.Put(s.Samples) SamplesPool.Put(s.Samples)
} }

@ -561,7 +561,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
} }
instance := i.GetOrCreateInstance(instanceID) instance := i.GetOrCreateInstance(instanceID)
itrs, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) it, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req})
if err != nil { if err != nil {
return err return err
} }
@ -577,17 +577,15 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
}} }}
storeItr, err := i.store.SelectLogs(ctx, storeReq) storeItr, err := i.store.SelectLogs(ctx, storeReq)
if err != nil { if err != nil {
errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
return err return err
} }
it = iter.NewMergeEntryIterator(ctx, []iter.EntryIterator{it, storeItr}, req.Direction)
itrs = append(itrs, storeItr)
} }
heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction) defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
defer errUtil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close)
return sendBatches(ctx, heapItr, queryServer, req.Limit) return sendBatches(ctx, it, queryServer, req.Limit)
} }
// QuerySample the ingesters for series from logs matching a set of matchers. // QuerySample the ingesters for series from logs matching a set of matchers.
@ -601,7 +599,7 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
} }
instance := i.GetOrCreateInstance(instanceID) instance := i.GetOrCreateInstance(instanceID)
itrs, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req})
if err != nil { if err != nil {
return err return err
} }
@ -615,17 +613,16 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
}} }}
storeItr, err := i.store.SelectSamples(ctx, storeReq) storeItr, err := i.store.SelectSamples(ctx, storeReq)
if err != nil { if err != nil {
errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
return err return err
} }
itrs = append(itrs, storeItr) it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr})
} }
heapItr := iter.NewHeapSampleIterator(ctx, itrs) defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
defer errUtil.LogErrorWithContext(ctx, "closing iterator", heapItr.Close)
return sendSampleBatches(ctx, heapItr, queryServer) return sendSampleBatches(ctx, it, queryServer)
} }
// boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper. // boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper.

@ -306,7 +306,7 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
return s.labels return s.labels
} }
func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter.EntryIterator, error) { func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
expr, err := req.LogSelector() expr, err := req.LogSelector()
if err != nil { if err != nil {
return nil, err return nil, err
@ -341,10 +341,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter
return nil, err return nil, err
} }
return iters, nil return iter.NewSortEntryIterator(iters, req.Direction), nil
} }
func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) ([]iter.SampleIterator, error) { func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
expr, err := req.Expr() expr, err := req.Expr()
if err != nil { if err != nil {
return nil, err return nil, err
@ -386,7 +386,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
return nil, err return nil, err
} }
return iters, nil return iter.NewSortSampleIterator(iters), nil
} }
// Label returns the label names or values depending on the given request // Label returns the label names or values depending on the given request

@ -16,7 +16,6 @@ import (
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql"
loki_runtime "github.com/grafana/loki/pkg/runtime" loki_runtime "github.com/grafana/loki/pkg/runtime"
@ -497,7 +496,7 @@ func Test_Iterator(t *testing.T) {
} }
// prepare iterators. // prepare iterators.
itrs, err := instance.Query(ctx, it, err := instance.Query(ctx,
logql.SelectLogParams{ logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{ QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"} | logfmt`, Selector: `{job="3"} | logfmt`,
@ -509,12 +508,11 @@ func Test_Iterator(t *testing.T) {
}, },
) )
require.NoError(t, err) require.NoError(t, err)
heapItr := iter.NewHeapIterator(ctx, itrs, direction)
// assert the order is preserved. // assert the order is preserved.
var res *logproto.QueryResponse var res *logproto.QueryResponse
require.NoError(t, require.NoError(t,
sendBatches(ctx, heapItr, sendBatches(ctx, it,
fakeQueryServer( fakeQueryServer(
func(qr *logproto.QueryResponse) error { func(qr *logproto.QueryResponse) error {
res = qr res = qr
@ -578,7 +576,7 @@ func Test_ChunkFilter(t *testing.T) {
} }
// prepare iterators. // prepare iterators.
itrs, err := instance.Query(ctx, it, err := instance.Query(ctx,
logql.SelectLogParams{ logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{ QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`, Selector: `{job="3"}`,
@ -590,7 +588,6 @@ func Test_ChunkFilter(t *testing.T) {
}, },
) )
require.NoError(t, err) require.NoError(t, err)
it := iter.NewHeapIterator(ctx, itrs, direction)
defer it.Close() defer it.Close()
for it.Next() { for it.Next() {

@ -470,7 +470,7 @@ func (s *stream) Iterator(ctx context.Context, statsCtx *stats.Context, from, th
if ordered { if ordered {
return iter.NewNonOverlappingIterator(iterators, ""), nil return iter.NewNonOverlappingIterator(iterators, ""), nil
} }
return iter.NewHeapIterator(ctx, iterators, direction), nil return iter.NewSortEntryIterator(iterators, direction), nil
} }
// Returns an SampleIterator. // Returns an SampleIterator.
@ -507,7 +507,7 @@ func (s *stream) SampleIterator(ctx context.Context, statsCtx *stats.Context, fr
if ordered { if ordered {
return iter.NewNonOverlappingSampleIterator(iterators, ""), nil return iter.NewNonOverlappingSampleIterator(iterators, ""), nil
} }
return iter.NewHeapSampleIterator(ctx, iterators), nil return iter.NewSortSampleIterator(iterators), nil
} }
func (s *stream) addTailer(t *tailer) { func (s *stream) addTailer(t *tailer) {

@ -135,8 +135,8 @@ type HeapIterator interface {
Push(EntryIterator) Push(EntryIterator)
} }
// heapIterator iterates over a heap of iterators. // mergeEntryIterator iterates over a heap of iterators and merge duplicate entries.
type heapIterator struct { type mergeEntryIterator struct {
heap interface { heap interface {
heap.Interface heap.Interface
Peek() EntryIterator Peek() EntryIterator
@ -145,16 +145,17 @@ type heapIterator struct {
prefetched bool prefetched bool
stats *stats.Context stats *stats.Context
tuples []tuple tuples []tuple
currEntry logproto.Entry currEntry entryWithLabels
currLabels string errs []error
errs []error
} }
// NewHeapIterator returns a new iterator which uses a heap to merge together // NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any.
// entries for multiple interators. // The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator.
func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { // This means using this iterator with a single iterator will result in the same result as the input iterator.
result := &heapIterator{is: is, stats: stats.FromContext(ctx)} // If you don't need to deduplicate entries, use `NewSortEntryIterator` instead.
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &mergeEntryIterator{is: is, stats: stats.FromContext(ctx)}
switch direction { switch direction {
case logproto.BACKWARD: case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
@ -171,7 +172,7 @@ func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto
// prefetch iterates over all inner iterators to merge together, calls Next() on // prefetch iterates over all inner iterators to merge together, calls Next() on
// each of them to prefetch the first entry and pushes of them - who are not // each of them to prefetch the first entry and pushes of them - who are not
// empty - to the heap // empty - to the heap
func (i *heapIterator) prefetch() { func (i *mergeEntryIterator) prefetch() {
if i.prefetched { if i.prefetched {
return return
} }
@ -192,7 +193,7 @@ func (i *heapIterator) prefetch() {
// //
// If the iterator has no more entries or an error occur while advancing it, the iterator // If the iterator has no more entries or an error occur while advancing it, the iterator
// is not pushed to the heap and any possible error captured, so that can be get via Error(). // is not pushed to the heap and any possible error captured, so that can be get via Error().
func (i *heapIterator) requeue(ei EntryIterator, advanced bool) { func (i *mergeEntryIterator) requeue(ei EntryIterator, advanced bool) {
if advanced || ei.Next() { if advanced || ei.Next() {
heap.Push(i.heap, ei) heap.Push(i.heap, ei)
return return
@ -204,7 +205,7 @@ func (i *heapIterator) requeue(ei EntryIterator, advanced bool) {
util.LogError("closing iterator", ei.Close) util.LogError("closing iterator", ei.Close)
} }
func (i *heapIterator) Push(ei EntryIterator) { func (i *mergeEntryIterator) Push(ei EntryIterator) {
i.requeue(ei, false) i.requeue(ei, false)
} }
@ -213,7 +214,7 @@ type tuple struct {
EntryIterator EntryIterator
} }
func (i *heapIterator) Next() bool { func (i *mergeEntryIterator) Next() bool {
i.prefetch() i.prefetch()
if i.heap.Len() == 0 { if i.heap.Len() == 0 {
@ -222,8 +223,8 @@ func (i *heapIterator) Next() bool {
// shortcut for the last iterator. // shortcut for the last iterator.
if i.heap.Len() == 1 { if i.heap.Len() == 1 {
i.currEntry = i.heap.Peek().Entry() i.currEntry.entry = i.heap.Peek().Entry()
i.currLabels = i.heap.Peek().Labels() i.currEntry.labels = i.heap.Peek().Labels()
if !i.heap.Peek().Next() { if !i.heap.Peek().Next() {
i.heap.Pop() i.heap.Pop()
} }
@ -250,8 +251,8 @@ func (i *heapIterator) Next() bool {
// shortcut if we have a single tuple. // shortcut if we have a single tuple.
if len(i.tuples) == 1 { if len(i.tuples) == 1 {
i.currEntry = i.tuples[0].Entry i.currEntry.entry = i.tuples[0].Entry
i.currLabels = i.tuples[0].Labels() i.currEntry.labels = i.tuples[0].Labels()
i.requeue(i.tuples[0].EntryIterator, false) i.requeue(i.tuples[0].EntryIterator, false)
i.tuples = i.tuples[:0] i.tuples = i.tuples[:0]
return true return true
@ -260,12 +261,12 @@ func (i *heapIterator) Next() bool {
// Find in tuples which entry occurs most often which, due to quorum based // Find in tuples which entry occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry. // replication, is guaranteed to be the correct next entry.
t := i.tuples[0] t := i.tuples[0]
i.currEntry = t.Entry i.currEntry.entry = t.Entry
i.currLabels = t.Labels() i.currEntry.labels = t.Labels()
// Requeue the iterators, advancing them if they were consumed. // Requeue the iterators, advancing them if they were consumed.
for j := range i.tuples { for j := range i.tuples {
if i.tuples[j].Line != i.currEntry.Line { if i.tuples[j].Line != i.currEntry.entry.Line {
i.requeue(i.tuples[j].EntryIterator, true) i.requeue(i.tuples[j].EntryIterator, true)
continue continue
} }
@ -279,15 +280,15 @@ func (i *heapIterator) Next() bool {
return true return true
} }
func (i *heapIterator) Entry() logproto.Entry { func (i *mergeEntryIterator) Entry() logproto.Entry {
return i.currEntry return i.currEntry.entry
} }
func (i *heapIterator) Labels() string { func (i *mergeEntryIterator) Labels() string {
return i.currLabels return i.currEntry.labels
} }
func (i *heapIterator) Error() error { func (i *mergeEntryIterator) Error() error {
switch len(i.errs) { switch len(i.errs) {
case 0: case 0:
return nil return nil
@ -298,7 +299,7 @@ func (i *heapIterator) Error() error {
} }
} }
func (i *heapIterator) Close() error { func (i *mergeEntryIterator) Close() error {
for i.heap.Len() > 0 { for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil { if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err return err
@ -308,35 +309,143 @@ func (i *heapIterator) Close() error {
return nil return nil
} }
func (i *heapIterator) Peek() time.Time { func (i *mergeEntryIterator) Peek() time.Time {
i.prefetch() i.prefetch()
return i.heap.Peek().Entry().Timestamp return i.heap.Peek().Entry().Timestamp
} }
// Len returns the number of inner iterators on the heap, still having entries // Len returns the number of inner iterators on the heap, still having entries
func (i *heapIterator) Len() int { func (i *mergeEntryIterator) Len() int {
i.prefetch() i.prefetch()
return i.heap.Len() return i.heap.Len()
} }
type entrySortIterator struct {
heap interface {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prefetched bool
currEntry entryWithLabels
errs []error
}
// NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators.
// The iterator only order entries across given `is` iterators, it does not sort entries within individual iterator.
// This means using this iterator with a single iterator will result in the same result as the input iterator.
func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) EntryIterator {
if len(is) == 0 {
return NoopIterator
}
if len(is) == 1 {
return is[0]
}
result := &entrySortIterator{is: is}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
case logproto.FORWARD:
result.heap = &iteratorMinHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
default:
panic("bad direction")
}
return result
}
// init initialize the underlaying heap
func (i *entrySortIterator) init() {
if i.prefetched {
return
}
i.prefetched = true
for _, it := range i.is {
if it.Next() {
i.heap.Push(it)
continue
}
if err := it.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", it.Close)
}
heap.Init(i.heap)
// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
}
func (i *entrySortIterator) Next() bool {
i.init()
if i.heap.Len() == 0 {
return false
}
next := i.heap.Peek()
i.currEntry.entry = next.Entry()
i.currEntry.labels = next.Labels()
// if the top iterator is empty, we remove it.
if !next.Next() {
heap.Pop(i.heap)
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if i.heap.Len() > 1 {
heap.Fix(i.heap, 0)
}
return true
}
func (i *entrySortIterator) Entry() logproto.Entry {
return i.currEntry.entry
}
func (i *entrySortIterator) Labels() string {
return i.currEntry.labels
}
func (i *entrySortIterator) Error() error {
switch len(i.errs) {
case 0:
return nil
case 1:
return i.errs[0]
default:
return util.MultiError(i.errs)
}
}
func (i *entrySortIterator) Close() error {
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
}
}
return nil
}
// NewStreamsIterator returns an iterator over logproto.Stream // NewStreamsIterator returns an iterator over logproto.Stream
func NewStreamsIterator(ctx context.Context, streams []logproto.Stream, direction logproto.Direction) EntryIterator { func NewStreamsIterator(streams []logproto.Stream, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(streams)) is := make([]EntryIterator, 0, len(streams))
for i := range streams { for i := range streams {
is = append(is, NewStreamIterator(streams[i])) is = append(is, NewStreamIterator(streams[i]))
} }
return NewHeapIterator(ctx, is, direction) return NewSortEntryIterator(is, direction)
} }
// NewQueryResponseIterator returns an iterator over a QueryResponse. // NewQueryResponseIterator returns an iterator over a QueryResponse.
func NewQueryResponseIterator(ctx context.Context, resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator { func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(resp.Streams)) return NewStreamsIterator(resp.Streams, direction)
for i := range resp.Streams {
is = append(is, NewStreamIterator(resp.Streams[i]))
}
return NewHeapIterator(ctx, is, direction)
} }
type queryClientIterator struct { type queryClientIterator struct {
@ -365,7 +474,7 @@ func (i *queryClientIterator) Next() bool {
return false return false
} }
stats.JoinIngesters(ctx, batch.Stats) stats.JoinIngesters(ctx, batch.Stats)
i.curr = NewQueryResponseIterator(ctx, batch, i.direction) i.curr = NewQueryResponseIterator(batch, i.direction)
} }
return true return true

@ -45,7 +45,7 @@ func TestIterator(t *testing.T) {
// Test dedupe of overlapping iterators with the heap iterator. // Test dedupe of overlapping iterators with the heap iterator.
{ {
iterator: NewHeapIterator(context.Background(), []EntryIterator{ iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{
mkStreamIterator(offset(0, identity), defaultLabels), mkStreamIterator(offset(0, identity), defaultLabels),
mkStreamIterator(offset(testSize/2, identity), defaultLabels), mkStreamIterator(offset(testSize/2, identity), defaultLabels),
mkStreamIterator(offset(testSize, identity), defaultLabels), mkStreamIterator(offset(testSize, identity), defaultLabels),
@ -57,7 +57,7 @@ func TestIterator(t *testing.T) {
// Test dedupe of overlapping iterators with the heap iterator (backward). // Test dedupe of overlapping iterators with the heap iterator (backward).
{ {
iterator: NewHeapIterator(context.Background(), []EntryIterator{ iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{
mkStreamIterator(inverse(offset(0, identity)), defaultLabels), mkStreamIterator(inverse(offset(0, identity)), defaultLabels),
mkStreamIterator(inverse(offset(-testSize/2, identity)), defaultLabels), mkStreamIterator(inverse(offset(-testSize/2, identity)), defaultLabels),
mkStreamIterator(inverse(offset(-testSize, identity)), defaultLabels), mkStreamIterator(inverse(offset(-testSize, identity)), defaultLabels),
@ -69,7 +69,7 @@ func TestIterator(t *testing.T) {
// Test dedupe of entries with the same timestamp but different entries. // Test dedupe of entries with the same timestamp but different entries.
{ {
iterator: NewHeapIterator(context.Background(), []EntryIterator{ iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{
mkStreamIterator(offset(0, constant(0)), defaultLabels), mkStreamIterator(offset(0, constant(0)), defaultLabels),
mkStreamIterator(offset(0, constant(0)), defaultLabels), mkStreamIterator(offset(0, constant(0)), defaultLabels),
mkStreamIterator(offset(testSize, constant(0)), defaultLabels), mkStreamIterator(offset(testSize, constant(0)), defaultLabels),
@ -110,7 +110,7 @@ func TestIteratorMultipleLabels(t *testing.T) {
}{ }{
// Test merging with differing labels but same timestamps and values. // Test merging with differing labels but same timestamps and values.
{ {
iterator: NewHeapIterator(context.Background(), []EntryIterator{ iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{
mkStreamIterator(identity, "{foobar: \"baz1\"}"), mkStreamIterator(identity, "{foobar: \"baz1\"}"),
mkStreamIterator(identity, "{foobar: \"baz2\"}"), mkStreamIterator(identity, "{foobar: \"baz2\"}"),
}, logproto.FORWARD), }, logproto.FORWARD),
@ -128,7 +128,7 @@ func TestIteratorMultipleLabels(t *testing.T) {
// Test merging with differing labels but all the same timestamps and different values. // Test merging with differing labels but all the same timestamps and different values.
{ {
iterator: NewHeapIterator(context.Background(), []EntryIterator{ iterator: NewMergeEntryIterator(context.Background(), []EntryIterator{
mkStreamIterator(constant(0), "{foobar: \"baz1\"}"), mkStreamIterator(constant(0), "{foobar: \"baz1\"}"),
mkStreamIterator(constant(0), "{foobar: \"baz2\"}"), mkStreamIterator(constant(0), "{foobar: \"baz2\"}"),
}, logproto.FORWARD), }, logproto.FORWARD),
@ -158,7 +158,7 @@ func TestIteratorMultipleLabels(t *testing.T) {
} }
} }
func TestHeapIteratorPrefetch(t *testing.T) { func TestMergeIteratorPrefetch(t *testing.T) {
t.Parallel() t.Parallel()
type tester func(t *testing.T, i HeapIterator) type tester func(t *testing.T, i HeapIterator)
@ -182,7 +182,7 @@ func TestHeapIteratorPrefetch(t *testing.T) {
t.Run(testName, func(t *testing.T) { t.Run(testName, func(t *testing.T) {
t.Parallel() t.Parallel()
i := NewHeapIterator(context.Background(), []EntryIterator{ i := NewMergeEntryIterator(context.Background(), []EntryIterator{
mkStreamIterator(identity, "{foobar: \"baz1\"}"), mkStreamIterator(identity, "{foobar: \"baz1\"}"),
mkStreamIterator(identity, "{foobar: \"baz2\"}"), mkStreamIterator(identity, "{foobar: \"baz2\"}"),
}, logproto.FORWARD) }, logproto.FORWARD)
@ -234,7 +234,7 @@ func inverse(g generator) generator {
} }
} }
func TestHeapIteratorDeduplication(t *testing.T) { func TestMergeIteratorDeduplication(t *testing.T) {
foo := logproto.Stream{ foo := logproto.Stream{
Labels: `{app="foo"}`, Labels: `{app="foo"}`,
Entries: []logproto.Entry{ Entries: []logproto.Entry{
@ -272,7 +272,7 @@ func TestHeapIteratorDeduplication(t *testing.T) {
require.NoError(t, it.Error()) require.NoError(t, it.Error())
} }
// forward iteration // forward iteration
it := NewHeapIterator(context.Background(), []EntryIterator{ it := NewMergeEntryIterator(context.Background(), []EntryIterator{
NewStreamIterator(foo), NewStreamIterator(foo),
NewStreamIterator(bar), NewStreamIterator(bar),
NewStreamIterator(foo), NewStreamIterator(foo),
@ -284,7 +284,7 @@ func TestHeapIteratorDeduplication(t *testing.T) {
assertIt(it, false, len(foo.Entries)) assertIt(it, false, len(foo.Entries))
// backward iteration // backward iteration
it = NewHeapIterator(context.Background(), []EntryIterator{ it = NewMergeEntryIterator(context.Background(), []EntryIterator{
mustReverseStreamIterator(NewStreamIterator(foo)), mustReverseStreamIterator(NewStreamIterator(foo)),
mustReverseStreamIterator(NewStreamIterator(bar)), mustReverseStreamIterator(NewStreamIterator(bar)),
mustReverseStreamIterator(NewStreamIterator(foo)), mustReverseStreamIterator(NewStreamIterator(foo)),
@ -308,8 +308,8 @@ func TestReverseIterator(t *testing.T) {
itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels) itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels)
itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}") itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}")
heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) mergeIterator := NewMergeEntryIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD)
reversedIter, err := NewReversedIter(heapIterator, testSize, false) reversedIter, err := NewReversedIter(mergeIterator, testSize, false)
require.NoError(t, err) require.NoError(t, err)
for i := int64((testSize / 2) + 1); i <= testSize; i++ { for i := int64((testSize / 2) + 1); i <= testSize; i++ {
@ -347,8 +347,8 @@ func TestReverseEntryIteratorUnlimited(t *testing.T) {
itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels) itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels)
itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}") itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}")
heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD) mergeIterator := NewMergeEntryIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD)
reversedIter, err := NewReversedIter(heapIterator, 0, false) reversedIter, err := NewReversedIter(mergeIterator, 0, false)
require.NoError(t, err) require.NoError(t, err)
var ct int var ct int
@ -546,7 +546,7 @@ func Test_DuplicateCount(t *testing.T) {
} { } {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
_, ctx := stats.NewContext(context.Background()) _, ctx := stats.NewContext(context.Background())
it := NewHeapIterator(ctx, test.iters, test.direction) it := NewMergeEntryIterator(ctx, test.iters, test.direction)
defer it.Close() defer it.Close()
for it.Next() { for it.Next() {
} }
@ -636,7 +636,7 @@ func TestNonOverlappingClose(t *testing.T) {
require.Equal(t, true, b.closed.Load()) require.Equal(t, true, b.closed.Load())
} }
func BenchmarkHeapIterator(b *testing.B) { func BenchmarkSortIterator(b *testing.B) {
var ( var (
ctx = context.Background() ctx = context.Background()
streams []logproto.Stream streams []logproto.Stream
@ -658,18 +658,130 @@ func BenchmarkHeapIterator(b *testing.B) {
streams[i], streams[j] = streams[j], streams[i] streams[i], streams[j] = streams[j], streams[i]
}) })
b.ResetTimer() b.Run("merge sort", func(b *testing.B) {
for i := 0; i < b.N; i++ { b.ResetTimer()
b.StopTimer() for i := 0; i < b.N; i++ {
var itrs []EntryIterator b.StopTimer()
for i := 0; i < streamsCount; i++ { var itrs []EntryIterator
itrs = append(itrs, NewStreamIterator(streams[i])) for i := 0; i < streamsCount; i++ {
itrs = append(itrs, NewStreamIterator(streams[i]))
}
b.StartTimer()
it := NewMergeEntryIterator(ctx, itrs, logproto.BACKWARD)
for it.Next() {
it.Entry()
}
it.Close()
} }
b.StartTimer() })
it := NewHeapIterator(ctx, itrs, logproto.BACKWARD)
b.Run("sort", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var itrs []EntryIterator
for i := 0; i < streamsCount; i++ {
itrs = append(itrs, NewStreamIterator(streams[i]))
}
b.StartTimer()
it := NewSortEntryIterator(itrs, logproto.BACKWARD)
for it.Next() {
it.Entry()
}
it.Close()
}
})
}
func Test_EntrySortIterator(t *testing.T) {
t.Run("backward", func(t *testing.T) {
t.Parallel()
it := NewSortEntryIterator(
[]EntryIterator{
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 5)},
{Timestamp: time.Unix(0, 3)},
{Timestamp: time.Unix(0, 0)},
},
Labels: `{foo="bar"}`,
}),
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 4)},
{Timestamp: time.Unix(0, 2)},
{Timestamp: time.Unix(0, 1)},
},
Labels: `{foo="buzz"}`,
}),
}, logproto.BACKWARD)
var i int64 = 5
defer it.Close()
for it.Next() { for it.Next() {
it.Entry() require.Equal(t, time.Unix(0, i), it.Entry().Timestamp)
i--
} }
it.Close() })
} t.Run("forward", func(t *testing.T) {
t.Parallel()
it := NewSortEntryIterator(
[]EntryIterator{
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0)},
{Timestamp: time.Unix(0, 3)},
{Timestamp: time.Unix(0, 5)},
},
Labels: `{foo="bar"}`,
}),
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1)},
{Timestamp: time.Unix(0, 2)},
{Timestamp: time.Unix(0, 4)},
},
Labels: `{foo="buzz"}`,
}),
}, logproto.FORWARD)
var i int64
defer it.Close()
for it.Next() {
require.Equal(t, time.Unix(0, i), it.Entry().Timestamp)
i++
}
})
t.Run("forward sort by stream", func(t *testing.T) {
t.Parallel()
it := NewSortEntryIterator(
[]EntryIterator{
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0)},
{Timestamp: time.Unix(0, 3)},
{Timestamp: time.Unix(0, 5)},
},
Labels: `b`,
}),
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0)},
{Timestamp: time.Unix(0, 1)},
{Timestamp: time.Unix(0, 2)},
{Timestamp: time.Unix(0, 4)},
},
Labels: `a`,
}),
}, logproto.FORWARD)
// The first entry appears in both so we expect it to be sorted by Labels.
require.True(t, it.Next())
require.Equal(t, time.Unix(0, 0), it.Entry().Timestamp)
require.Equal(t, `a`, it.Labels())
var i int64
defer it.Close()
for it.Next() {
require.Equal(t, time.Unix(0, i), it.Entry().Timestamp)
i++
}
})
} }

@ -138,24 +138,25 @@ func (h sampleIteratorHeap) Less(i, j int) bool {
} }
} }
// heapSampleIterator iterates over a heap of iterators. // mergeSampleIterator iterates over a heap of iterators by merging samples.
type heapSampleIterator struct { type mergeSampleIterator struct {
heap *sampleIteratorHeap heap *sampleIteratorHeap
is []SampleIterator is []SampleIterator
prefetched bool prefetched bool
stats *stats.Context stats *stats.Context
tuples []sampletuple tuples []sampletuple
curr logproto.Sample curr sampleWithLabels
currLabels string errs []error
errs []error
} }
// NewHeapSampleIterator returns a new iterator which uses a heap to merge together // NewMergeSampleIterator returns a new iterator which uses a heap to merge together samples for multiple iterators and deduplicate if any.
// entries for multiple iterators. // The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator.
func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator { // This means using this iterator with a single iterator will result in the same result as the input iterator.
// If you don't need to deduplicate sample, use `NewSortSampleIterator` instead.
func NewMergeSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator {
h := sampleIteratorHeap(make([]SampleIterator, 0, len(is))) h := sampleIteratorHeap(make([]SampleIterator, 0, len(is)))
return &heapSampleIterator{ return &mergeSampleIterator{
stats: stats.FromContext(ctx), stats: stats.FromContext(ctx),
is: is, is: is,
heap: &h, heap: &h,
@ -166,7 +167,7 @@ func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleItera
// prefetch iterates over all inner iterators to merge together, calls Next() on // prefetch iterates over all inner iterators to merge together, calls Next() on
// each of them to prefetch the first entry and pushes of them - who are not // each of them to prefetch the first entry and pushes of them - who are not
// empty - to the heap // empty - to the heap
func (i *heapSampleIterator) prefetch() { func (i *mergeSampleIterator) prefetch() {
if i.prefetched { if i.prefetched {
return return
} }
@ -187,7 +188,7 @@ func (i *heapSampleIterator) prefetch() {
// //
// If the iterator has no more entries or an error occur while advancing it, the iterator // If the iterator has no more entries or an error occur while advancing it, the iterator
// is not pushed to the heap and any possible error captured, so that can be get via Error(). // is not pushed to the heap and any possible error captured, so that can be get via Error().
func (i *heapSampleIterator) requeue(ei SampleIterator, advanced bool) { func (i *mergeSampleIterator) requeue(ei SampleIterator, advanced bool) {
if advanced || ei.Next() { if advanced || ei.Next() {
heap.Push(i.heap, ei) heap.Push(i.heap, ei)
return return
@ -204,7 +205,7 @@ type sampletuple struct {
SampleIterator SampleIterator
} }
func (i *heapSampleIterator) Next() bool { func (i *mergeSampleIterator) Next() bool {
i.prefetch() i.prefetch()
if i.heap.Len() == 0 { if i.heap.Len() == 0 {
@ -213,8 +214,8 @@ func (i *heapSampleIterator) Next() bool {
// shortcut for the last iterator. // shortcut for the last iterator.
if i.heap.Len() == 1 { if i.heap.Len() == 1 {
i.curr = i.heap.Peek().Sample() i.curr.Sample = i.heap.Peek().Sample()
i.currLabels = i.heap.Peek().Labels() i.curr.labels = i.heap.Peek().Labels()
if !i.heap.Peek().Next() { if !i.heap.Peek().Next() {
i.heap.Pop() i.heap.Pop()
} }
@ -239,8 +240,8 @@ func (i *heapSampleIterator) Next() bool {
}) })
} }
i.curr = i.tuples[0].Sample i.curr.Sample = i.tuples[0].Sample
i.currLabels = i.tuples[0].Labels() i.curr.labels = i.tuples[0].Labels()
t := i.tuples[0] t := i.tuples[0]
if len(i.tuples) == 1 { if len(i.tuples) == 1 {
i.requeue(i.tuples[0].SampleIterator, false) i.requeue(i.tuples[0].SampleIterator, false)
@ -263,15 +264,15 @@ func (i *heapSampleIterator) Next() bool {
return true return true
} }
func (i *heapSampleIterator) Sample() logproto.Sample { func (i *mergeSampleIterator) Sample() logproto.Sample {
return i.curr return i.curr.Sample
} }
func (i *heapSampleIterator) Labels() string { func (i *mergeSampleIterator) Labels() string {
return i.currLabels return i.curr.labels
} }
func (i *heapSampleIterator) Error() error { func (i *mergeSampleIterator) Error() error {
switch len(i.errs) { switch len(i.errs) {
case 0: case 0:
return nil return nil
@ -282,7 +283,7 @@ func (i *heapSampleIterator) Error() error {
} }
} }
func (i *heapSampleIterator) Close() error { func (i *mergeSampleIterator) Close() error {
for i.heap.Len() > 0 { for i.heap.Len() > 0 {
if err := i.heap.Pop().(SampleIterator).Close(); err != nil { if err := i.heap.Pop().(SampleIterator).Close(); err != nil {
return err return err
@ -292,6 +293,111 @@ func (i *heapSampleIterator) Close() error {
return nil return nil
} }
// sortSampleIterator iterates over a heap of iterators by sorting samples.
type sortSampleIterator struct {
heap *sampleIteratorHeap
is []SampleIterator
prefetched bool
curr sampleWithLabels
errs []error
}
// NewSortSampleIterator returns a new SampleIterator that sorts samples by ascending timestamp the input iterators.
// The iterator only order sample across given `is` iterators, it does not sort samples within individual iterator.
// This means using this iterator with a single iterator will result in the same result as the input iterator.
func NewSortSampleIterator(is []SampleIterator) SampleIterator {
if len(is) == 0 {
return NoopIterator
}
if len(is) == 1 {
return is[0]
}
h := sampleIteratorHeap(make([]SampleIterator, 0, len(is)))
return &sortSampleIterator{
is: is,
heap: &h,
}
}
// init initialize the underlaying heap
func (i *sortSampleIterator) init() {
if i.prefetched {
return
}
i.prefetched = true
for _, it := range i.is {
if it.Next() {
i.heap.Push(it)
continue
}
if err := it.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", it.Close)
}
heap.Init(i.heap)
// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
}
func (i *sortSampleIterator) Next() bool {
i.init()
if i.heap.Len() == 0 {
return false
}
next := i.heap.Peek()
i.curr.Sample = next.Sample()
i.curr.labels = next.Labels()
// if the top iterator is empty, we remove it.
if !next.Next() {
heap.Pop(i.heap)
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if i.heap.Len() > 1 {
heap.Fix(i.heap, 0)
}
return true
}
func (i *sortSampleIterator) Sample() logproto.Sample {
return i.curr.Sample
}
func (i *sortSampleIterator) Labels() string {
return i.curr.labels
}
func (i *sortSampleIterator) Error() error {
switch len(i.errs) {
case 0:
return nil
case 1:
return i.errs[0]
default:
return util.MultiError(i.errs)
}
}
func (i *sortSampleIterator) Close() error {
for i.heap.Len() > 0 {
if err := i.heap.Pop().(SampleIterator).Close(); err != nil {
return err
}
}
return nil
}
type sampleQueryClientIterator struct { type sampleQueryClientIterator struct {
client QuerySampleClient client QuerySampleClient
err error err error
@ -323,7 +429,7 @@ func (i *sampleQueryClientIterator) Next() bool {
return false return false
} }
stats.JoinIngesters(ctx, batch.Stats) stats.JoinIngesters(ctx, batch.Stats)
i.curr = NewSampleQueryResponseIterator(ctx, batch) i.curr = NewSampleQueryResponseIterator(batch)
} }
return true return true
} }
@ -345,8 +451,8 @@ func (i *sampleQueryClientIterator) Close() error {
} }
// NewSampleQueryResponseIterator returns an iterator over a SampleQueryResponse. // NewSampleQueryResponseIterator returns an iterator over a SampleQueryResponse.
func NewSampleQueryResponseIterator(ctx context.Context, resp *logproto.SampleQueryResponse) SampleIterator { func NewSampleQueryResponseIterator(resp *logproto.SampleQueryResponse) SampleIterator {
return NewMultiSeriesIterator(ctx, resp.Series) return NewMultiSeriesIterator(resp.Series)
} }
type seriesIterator struct { type seriesIterator struct {
@ -386,12 +492,12 @@ func SampleIteratorWithClose(it SampleIterator, closeFn func() error) SampleIter
} }
// NewMultiSeriesIterator returns an iterator over multiple logproto.Series // NewMultiSeriesIterator returns an iterator over multiple logproto.Series
func NewMultiSeriesIterator(ctx context.Context, series []logproto.Series) SampleIterator { func NewMultiSeriesIterator(series []logproto.Series) SampleIterator {
is := make([]SampleIterator, 0, len(series)) is := make([]SampleIterator, 0, len(series))
for i := range series { for i := range series {
is = append(is, NewSeriesIterator(series[i])) is = append(is, NewSeriesIterator(series[i]))
} }
return NewHeapSampleIterator(ctx, is) return NewSortSampleIterator(is)
} }
// NewSeriesIterator iterates over sample in a series. // NewSeriesIterator iterates over sample in a series.

@ -104,8 +104,8 @@ var carSeries = logproto.Series{
}, },
} }
func TestNewHeapSampleIterator(t *testing.T) { func TestNewMergeSampleIterator(t *testing.T) {
it := NewHeapSampleIterator(context.Background(), it := NewMergeSampleIterator(context.Background(),
[]SampleIterator{ []SampleIterator{
NewSeriesIterator(varSeries), NewSeriesIterator(varSeries),
NewSeriesIterator(carSeries), NewSeriesIterator(carSeries),
@ -194,7 +194,7 @@ func TestReadSampleBatch(t *testing.T) {
require.Equal(t, uint32(1), size) require.Equal(t, uint32(1), size)
require.NoError(t, err) require.NoError(t, err)
res, size, err = ReadSampleBatch(NewMultiSeriesIterator(context.Background(), []logproto.Series{carSeries, varSeries}), 100) res, size, err = ReadSampleBatch(NewMultiSeriesIterator([]logproto.Series{carSeries, varSeries}), 100)
require.ElementsMatch(t, []logproto.Series{carSeries, varSeries}, res.Series) require.ElementsMatch(t, []logproto.Series{carSeries, varSeries}, res.Series)
require.Equal(t, uint32(6), size) require.Equal(t, uint32(6), size)
require.NoError(t, err) require.NoError(t, err)
@ -277,7 +277,7 @@ func TestSampleIteratorWithClose_ReturnsError(t *testing.T) {
assert.Equal(t, err, err2) assert.Equal(t, err, err2)
} }
func BenchmarkHeapSampleIterator(b *testing.B) { func BenchmarkSortSampleIterator(b *testing.B) {
var ( var (
ctx = context.Background() ctx = context.Background()
series []logproto.Series series []logproto.Series
@ -299,18 +299,102 @@ func BenchmarkHeapSampleIterator(b *testing.B) {
series[i], series[j] = series[j], series[i] series[i], series[j] = series[j], series[i]
}) })
b.ResetTimer() b.Run("merge", func(b *testing.B) {
for i := 0; i < b.N; i++ { b.ResetTimer()
b.StopTimer() for i := 0; i < b.N; i++ {
var itrs []SampleIterator b.StopTimer()
for i := 0; i < seriesCount; i++ { var itrs []SampleIterator
itrs = append(itrs, NewSeriesIterator(series[i])) for i := 0; i < seriesCount; i++ {
itrs = append(itrs, NewSeriesIterator(series[i]))
}
b.StartTimer()
it := NewMergeSampleIterator(ctx, itrs)
for it.Next() {
it.Sample()
}
it.Close()
} }
b.StartTimer() })
it := NewHeapSampleIterator(ctx, itrs) b.Run("sort", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var itrs []SampleIterator
for i := 0; i < seriesCount; i++ {
itrs = append(itrs, NewSeriesIterator(series[i]))
}
b.StartTimer()
it := NewSortSampleIterator(itrs)
for it.Next() {
it.Sample()
}
it.Close()
}
})
}
func Test_SampleSortIterator(t *testing.T) {
t.Run("forward", func(t *testing.T) {
t.Parallel()
it := NewSortSampleIterator(
[]SampleIterator{
NewSeriesIterator(logproto.Series{
Samples: []logproto.Sample{
{Timestamp: 0},
{Timestamp: 3},
{Timestamp: 5},
},
Labels: `{foo="bar"}`,
}),
NewSeriesIterator(logproto.Series{
Samples: []logproto.Sample{
{Timestamp: 1},
{Timestamp: 2},
{Timestamp: 4},
},
Labels: `{foo="bar"}`,
}),
})
var i int64
defer it.Close()
for it.Next() { for it.Next() {
it.Sample() require.Equal(t, i, it.Sample().Timestamp)
i++
} }
it.Close() })
} t.Run("forward sort by stream", func(t *testing.T) {
t.Parallel()
it := NewSortSampleIterator(
[]SampleIterator{
NewSeriesIterator(logproto.Series{
Samples: []logproto.Sample{
{Timestamp: 0},
{Timestamp: 3},
{Timestamp: 5},
},
Labels: `b`,
}),
NewSeriesIterator(logproto.Series{
Samples: []logproto.Sample{
{Timestamp: 0},
{Timestamp: 1},
{Timestamp: 2},
{Timestamp: 4},
},
Labels: `a`,
}),
})
// The first entry appears in both so we expect it to be sorted by Labels.
require.True(t, it.Next())
require.Equal(t, int64(0), it.Sample().Timestamp)
require.Equal(t, `a`, it.Labels())
var i int64
defer it.Close()
for it.Next() {
require.Equal(t, i, it.Sample().Timestamp)
i++
}
})
} }

@ -32,9 +32,7 @@ const (
defaultMaxFileSize = 20 * (1 << 20) // 20MB defaultMaxFileSize = 20 * (1 << 20) // 20MB
) )
var ( var ErrNotSupported = errors.New("not supported")
ErrNotSupported = errors.New("not supported")
)
// FileClient is a type of LogCLI client that do LogQL on log lines from // FileClient is a type of LogCLI client that do LogQL on log lines from
// the given file directly, instead get log lines from Loki servers. // the given file directly, instead get log lines from Loki servers.
@ -63,7 +61,6 @@ func NewFileClient(r io.ReadCloser) *FileClient {
labels: []string{defaultLabelKey}, labels: []string{defaultLabelKey},
labelValues: []string{defaultLabelValue}, labelValues: []string{defaultLabelValue},
} }
} }
func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) {
@ -198,7 +195,7 @@ type querier struct {
labels labels.Labels labels labels.Labels
} }
func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { func (q *querier) SelectLogs(_ context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
expr, err := params.LogSelector() expr, err := params.LogSelector()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to extract selector for logs: %w", err) return nil, fmt.Errorf("failed to extract selector for logs: %w", err)
@ -207,7 +204,7 @@ func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err) return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err)
} }
return newFileIterator(ctx, q.r, params, pipeline.ForStream(q.labels)) return newFileIterator(q.r, params, pipeline.ForStream(q.labels))
} }
func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
@ -215,7 +212,6 @@ func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa
} }
func newFileIterator( func newFileIterator(
ctx context.Context,
r io.Reader, r io.Reader,
params logql.SelectLogParams, params logql.SelectLogParams,
pipeline logqllog.StreamPipeline, pipeline logqllog.StreamPipeline,
@ -278,7 +274,6 @@ func newFileIterator(
} }
return iter.NewStreamsIterator( return iter.NewStreamsIterator(
ctx,
streamResult, streamResult,
params.Direction, params.Direction,
), nil ), nil

@ -1340,7 +1340,7 @@ func TestEngine_RangeQuery(t *testing.T) {
`topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{ [][]logproto.Series{
{ {
newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(15, identity), `{app="fuzz"}`), newSeries(testSize, factor(10, identity), `{app="foo"}`),
newSeries(testSize, factor(5, identity), `{app="fuzz"}`), newSeries(testSize, identity, `{app="buzz"}`), newSeries(testSize, factor(5, identity), `{app="fuzz"}`), newSeries(testSize, identity, `{app="buzz"}`),
}, },
}, },
@ -2085,11 +2085,11 @@ type errorIteratorQuerier struct {
} }
func (e errorIteratorQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) { func (e errorIteratorQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) {
return iter.NewHeapIterator(ctx, e.entries, p.Direction), nil return iter.NewSortEntryIterator(e.entries, p.Direction), nil
} }
func (e errorIteratorQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) { func (e errorIteratorQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) {
return iter.NewHeapSampleIterator(ctx, e.samples), nil return iter.NewSortSampleIterator(e.samples), nil
} }
func TestStepEvaluator_Error(t *testing.T) { func TestStepEvaluator_Error(t *testing.T) {
@ -2269,11 +2269,6 @@ func getLocalQuerier(size int64) Querier {
newSeries(size, identity, `{app="bar",bar="foo"}`), newSeries(size, identity, `{app="bar",bar="foo"}`),
newSeries(size, identity, `{app="bar",bar="bazz"}`), newSeries(size, identity, `{app="bar",bar="bazz"}`),
newSeries(size, identity, `{app="bar",bar="fuzz"}`), newSeries(size, identity, `{app="bar",bar="fuzz"}`),
// some duplicates
newSeries(size, identity, `{app="foo"}`),
newSeries(size, identity, `{app="bar"}`),
newSeries(size, identity, `{app="bar",bar="bazz"}`),
newSeries(size, identity, `{app="bar"}`),
}, },
}, },
streams: map[string][]logproto.Stream{ streams: map[string][]logproto.Stream{
@ -2286,11 +2281,6 @@ func getLocalQuerier(size int64) Querier {
newStream(size, identity, `{app="bar",bar="foo"}`), newStream(size, identity, `{app="bar",bar="foo"}`),
newStream(size, identity, `{app="bar",bar="bazz"}`), newStream(size, identity, `{app="bar",bar="bazz"}`),
newStream(size, identity, `{app="bar",bar="fuzz"}`), newStream(size, identity, `{app="bar",bar="fuzz"}`),
// some duplicates
newStream(size, identity, `{app="foo"}`),
newStream(size, identity, `{app="bar"}`),
newStream(size, identity, `{app="bar",bar="bazz"}`),
newStream(size, identity, `{app="bar"}`),
}, },
}, },
} }
@ -2331,7 +2321,7 @@ func newQuerierRecorder(t *testing.T, data interface{}, params interface{}) *que
func (q *querierRecorder) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) { func (q *querierRecorder) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) {
if !q.match { if !q.match {
for _, s := range q.streams { for _, s := range q.streams {
return iter.NewStreamsIterator(ctx, s, p.Direction), nil return iter.NewStreamsIterator(s, p.Direction), nil
} }
} }
recordID := paramsID(p) recordID := paramsID(p)
@ -2339,17 +2329,13 @@ func (q *querierRecorder) SelectLogs(ctx context.Context, p SelectLogParams) (it
if !ok { if !ok {
return nil, fmt.Errorf("no streams found for id: %s has: %+v", recordID, q.streams) return nil, fmt.Errorf("no streams found for id: %s has: %+v", recordID, q.streams)
} }
iters := make([]iter.EntryIterator, 0, len(streams)) return iter.NewStreamsIterator(streams, p.Direction), nil
for _, s := range streams {
iters = append(iters, iter.NewStreamIterator(s))
}
return iter.NewHeapIterator(ctx, iters, p.Direction), nil
} }
func (q *querierRecorder) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) { func (q *querierRecorder) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) {
if !q.match { if !q.match {
for _, s := range q.series { for _, s := range q.series {
return iter.NewMultiSeriesIterator(ctx, s), nil return iter.NewMultiSeriesIterator(s), nil
} }
} }
recordID := paramsID(p) recordID := paramsID(p)
@ -2360,11 +2346,7 @@ func (q *querierRecorder) SelectSamples(ctx context.Context, p SelectSampleParam
if !ok { if !ok {
return nil, fmt.Errorf("no series found for id: %s has: %+v", recordID, q.series) return nil, fmt.Errorf("no series found for id: %s has: %+v", recordID, q.series)
} }
iters := make([]iter.SampleIterator, 0, len(series)) return iter.NewMultiSeriesIterator(series), nil
for _, s := range series {
iters = append(iters, iter.NewSeriesIterator(s))
}
return iter.NewHeapSampleIterator(ctx, iters), nil
} }
func paramsID(p interface{}) string { func paramsID(p interface{}) string {

@ -34,7 +34,7 @@ var (
) )
func newSampleIterator() iter.SampleIterator { func newSampleIterator() iter.SampleIterator {
return iter.NewHeapSampleIterator(context.Background(), []iter.SampleIterator{ return iter.NewSortSampleIterator([]iter.SampleIterator{
iter.NewSeriesIterator(logproto.Series{ iter.NewSeriesIterator(logproto.Series{
Labels: labelFoo.String(), Labels: labelFoo.String(),
Samples: samples, Samples: samples,

@ -324,7 +324,7 @@ func (ev *DownstreamEvaluator) Iterator(
xs = append(xs, iter) xs = append(xs, iter)
} }
return iter.NewHeapIterator(ctx, xs, params.Direction()), nil return iter.NewSortEntryIterator(xs, params.Direction()), nil
default: default:
return nil, EvaluatorUnsupportedType(expr, ev) return nil, EvaluatorUnsupportedType(expr, ev)
@ -401,5 +401,5 @@ func ResultIterator(res logqlmodel.Result, params Params) (iter.EntryIterator, e
if !ok { if !ok {
return nil, fmt.Errorf("unexpected type (%s) for ResultIterator; expected %s", res.Data.Type(), logqlmodel.ValueTypeStreams) return nil, fmt.Errorf("unexpected type (%s) for ResultIterator; expected %s", res.Data.Type(), logqlmodel.ValueTypeStreams)
} }
return iter.NewStreamsIterator(context.Background(), streams, params.Direction()), nil return iter.NewStreamsIterator(streams, params.Direction()), nil
} }

@ -90,7 +90,7 @@ outer:
} }
} }
return iter.NewHeapIterator(ctx, streamIters, req.Direction), nil return iter.NewSortEntryIterator(streamIters, req.Direction), nil
} }
func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Stream { func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Stream {
@ -200,7 +200,7 @@ outer:
filtered := processSeries(matched, extractor) filtered := processSeries(matched, extractor)
return iter.NewTimeRangedSampleIterator( return iter.NewTimeRangedSampleIterator(
iter.NewMultiSeriesIterator(ctx, filtered), iter.NewMultiSeriesIterator(filtered),
req.Start.UnixNano(), req.Start.UnixNano(),
req.End.UnixNano()+1, req.End.UnixNano()+1,
), nil ), nil

@ -142,8 +142,10 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams)
iters = append(iters, storeIter) iters = append(iters, storeIter)
} }
if len(iters) == 1 {
return iter.NewHeapIterator(ctx, iters, params.Direction), nil return iters[0], nil
}
return iter.NewMergeEntryIterator(ctx, iters, params.Direction), nil
} }
func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
@ -185,7 +187,7 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa
iters = append(iters, storeIter) iters = append(iters, storeIter)
} }
return iter.NewHeapSampleIterator(ctx, iters), nil return iter.NewMergeSampleIterator(ctx, iters), nil
} }
func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {

@ -272,7 +272,7 @@ func newTailer(
waitEntryThrottle time.Duration, waitEntryThrottle time.Duration,
) *Tailer { ) *Tailer {
t := Tailer{ t := Tailer{
openStreamIterator: iter.NewHeapIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD), openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD),
querierTailClients: querierTailClients, querierTailClients: querierTailClients,
delayFor: delayFor, delayFor: delayFor,
responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses), responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses),

@ -396,8 +396,10 @@ func (it *logBatchIterator) newChunksIterator(b *chunkBatch) (iter.EntryIterator
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(iters) == 1 {
return iter.NewHeapIterator(it.ctx, iters, it.direction), nil return iters[0], nil
}
return iter.NewSortEntryIterator(iters, it.direction), nil
} }
func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) { func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) {
@ -440,7 +442,7 @@ func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through
result = append(result, iter.NewNonOverlappingIterator(iterators, "")) result = append(result, iter.NewNonOverlappingIterator(iterators, ""))
} }
return iter.NewHeapIterator(it.ctx, result, it.direction), nil return iter.NewMergeEntryIterator(it.ctx, result, it.direction), nil
} }
type sampleBatchIterator struct { type sampleBatchIterator struct {
@ -537,7 +539,7 @@ func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIter
return nil, err return nil, err
} }
return iter.NewHeapSampleIterator(it.ctx, iters), nil return iter.NewSortSampleIterator(iters), nil
} }
func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.SampleIterator, error) { func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.SampleIterator, error) {
@ -574,7 +576,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro
result = append(result, iter.NewNonOverlappingSampleIterator(iterators, "")) result = append(result, iter.NewNonOverlappingSampleIterator(iterators, ""))
} }
return iter.NewHeapSampleIterator(it.ctx, result), nil return iter.NewMergeSampleIterator(it.ctx, result), nil
} }
func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels.Matcher { func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels.Matcher {

Loading…
Cancel
Save