package iter import ( "container/heap" "context" "io" "sort" "sync" "time" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/util" ) // EntryIterator iterates over entries in time-order. type EntryIterator interface { Iterator Entry() logproto.Entry } // streamIterator iterates over entries in a stream. type streamIterator struct { i int stream logproto.Stream } // NewStreamIterator iterates over entries in a stream. func NewStreamIterator(stream logproto.Stream) EntryIterator { return &streamIterator{ i: -1, stream: stream, } } func (i *streamIterator) Next() bool { i.i++ return i.i < len(i.stream.Entries) } func (i *streamIterator) Error() error { return nil } func (i *streamIterator) Labels() string { return i.stream.Labels } func (i *streamIterator) StreamHash() uint64 { return i.stream.Hash } func (i *streamIterator) Entry() logproto.Entry { return i.stream.Entries[i.i] } func (i *streamIterator) Close() error { return nil } type iteratorHeap []EntryIterator func (h iteratorHeap) Len() int { return len(h) } func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h iteratorHeap) Peek() EntryIterator { return h[0] } func (h *iteratorHeap) Push(x interface{}) { *h = append(*h, x.(EntryIterator)) } func (h *iteratorHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x } type iteratorSortHeap struct { iteratorHeap byAscendingTime bool } func (h iteratorSortHeap) Less(i, j int) bool { t1, t2 := h.iteratorHeap[i].Entry().Timestamp.UnixNano(), h.iteratorHeap[j].Entry().Timestamp.UnixNano() if t1 == t2 { return h.iteratorHeap[i].StreamHash() < h.iteratorHeap[j].StreamHash() } if h.byAscendingTime { return t1 < t2 } return t1 > t2 } // HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len // Not safe for concurrent use type HeapIterator interface { EntryIterator Peek() time.Time Len() int Push(EntryIterator) } // mergeEntryIterator iterates over a heap of iterators and merge duplicate entries. type mergeEntryIterator struct { heap interface { heap.Interface Peek() EntryIterator } is []EntryIterator // pushBuffer contains the list of iterators that needs to be pushed to the heap // This is to avoid allocations. pushBuffer []EntryIterator prefetched bool stats *stats.Context // buffer of entries to be returned by Next() // We buffer entries with the same timestamp to correctly dedupe them. buffer []entryWithLabels currEntry entryWithLabels errs []error } // NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any. // The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator. // This means using this iterator with a single iterator will result in the same result as the input iterator. // If you don't need to deduplicate entries, use `NewSortEntryIterator` instead. func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { result := &mergeEntryIterator{is: is, stats: stats.FromContext(ctx)} switch direction { case logproto.BACKWARD: result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false} case logproto.FORWARD: result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true} default: panic("bad direction") } result.buffer = make([]entryWithLabels, 0, len(is)) result.pushBuffer = make([]EntryIterator, 0, len(is)) return result } // prefetch iterates over all inner iterators to merge together, calls Next() on // each of them to prefetch the first entry and pushes of them - who are not // empty - to the heap func (i *mergeEntryIterator) prefetch() { if i.prefetched { return } i.prefetched = true for _, it := range i.is { i.requeue(it, false) } // We can now clear the list of input iterators to merge, given they have all // been processed and the non empty ones have been pushed to the heap i.is = nil } // requeue pushes the input ei EntryIterator to the heap, advancing it via an ei.Next() // call unless the advanced input parameter is true. In this latter case it expects that // the iterator has already been advanced before calling requeue(). // // If the iterator has no more entries or an error occur while advancing it, the iterator // is not pushed to the heap and any possible error captured, so that can be get via Error(). func (i *mergeEntryIterator) requeue(ei EntryIterator, advanced bool) { if advanced || ei.Next() { heap.Push(i.heap, ei) return } if err := ei.Error(); err != nil { i.errs = append(i.errs, err) } util.LogError("closing iterator", ei.Close) } func (i *mergeEntryIterator) Push(ei EntryIterator) { i.requeue(ei, false) } // Next pop iterators from the heap until it finds an entry with a different timestamp or stream hash. // For each iterators we also buffer entries with the current timestamp and stream hash deduping as we loop. // If the iterator is not fully exhausted, it is pushed back to the heap. func (i *mergeEntryIterator) Next() bool { i.prefetch() if len(i.buffer) != 0 { i.nextFromBuffer() return true } if i.heap.Len() == 0 { return false } // shortcut for the last iterator. if i.heap.Len() == 1 { i.currEntry.Entry = i.heap.Peek().Entry() i.currEntry.labels = i.heap.Peek().Labels() i.currEntry.streamHash = i.heap.Peek().StreamHash() if !i.heap.Peek().Next() { i.heap.Pop() } return true } // We support multiple entries with the same timestamp, and we want to // preserve their original order. We look at all the top entries in the // heap with the same timestamp, and pop the ones whose common value // occurs most often. Outer: for i.heap.Len() > 0 { next := i.heap.Peek() entry := next.Entry() if len(i.buffer) > 0 && (i.buffer[0].streamHash != next.StreamHash() || !i.buffer[0].Entry.Timestamp.Equal(entry.Timestamp)) { break } heap.Pop(i.heap) previous := i.buffer var dupe bool for _, t := range previous { if t.Entry.Line == entry.Line { i.stats.AddDuplicates(1) dupe = true break } } if !dupe { i.buffer = append(i.buffer, entryWithLabels{ Entry: entry, labels: next.Labels(), streamHash: next.StreamHash(), }) } inner: for { if !next.Next() { continue Outer } entry := next.Entry() if next.StreamHash() != i.buffer[0].streamHash || !entry.Timestamp.Equal(i.buffer[0].Entry.Timestamp) { break } for _, t := range previous { if t.Entry.Line == entry.Line { i.stats.AddDuplicates(1) continue inner } } i.buffer = append(i.buffer, entryWithLabels{ Entry: entry, labels: next.Labels(), streamHash: next.StreamHash(), }) } i.pushBuffer = append(i.pushBuffer, next) } for _, ei := range i.pushBuffer { heap.Push(i.heap, ei) } i.pushBuffer = i.pushBuffer[:0] i.nextFromBuffer() return true } func (i *mergeEntryIterator) nextFromBuffer() { i.currEntry.Entry = i.buffer[0].Entry i.currEntry.labels = i.buffer[0].labels i.currEntry.streamHash = i.buffer[0].streamHash if len(i.buffer) == 1 { i.buffer = i.buffer[:0] return } i.buffer = i.buffer[1:] } func (i *mergeEntryIterator) Entry() logproto.Entry { return i.currEntry.Entry } func (i *mergeEntryIterator) Labels() string { return i.currEntry.labels } func (i *mergeEntryIterator) StreamHash() uint64 { return i.currEntry.streamHash } func (i *mergeEntryIterator) Error() error { switch len(i.errs) { case 0: return nil case 1: return i.errs[0] default: return util.MultiError(i.errs) } } func (i *mergeEntryIterator) Close() error { for i.heap.Len() > 0 { if err := i.heap.Pop().(EntryIterator).Close(); err != nil { return err } } i.buffer = nil return nil } func (i *mergeEntryIterator) Peek() time.Time { i.prefetch() return i.heap.Peek().Entry().Timestamp } // Len returns the number of inner iterators on the heap, still having entries func (i *mergeEntryIterator) Len() int { i.prefetch() return i.heap.Len() } type entrySortIterator struct { is []EntryIterator prefetched bool byAscendingTime bool currEntry entryWithLabels errs []error } // NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators. // The iterator only order entries across given `is` iterators, it does not sort entries within individual iterator. // This means using this iterator with a single iterator will result in the same result as the input iterator. // When timestamp is equal, the iterator sorts samples by their label alphabetically. func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) EntryIterator { if len(is) == 0 { return NoopIterator } if len(is) == 1 { return is[0] } result := &entrySortIterator{is: is} switch direction { case logproto.BACKWARD: result.byAscendingTime = false case logproto.FORWARD: result.byAscendingTime = true default: panic("bad direction") } return result } func (i *entrySortIterator) lessByIndex(k, j int) bool { t1, t2 := i.is[k].Entry().Timestamp.UnixNano(), i.is[j].Entry().Timestamp.UnixNano() if t1 == t2 { // The underlying stream hash may not be available, such as when merging LokiResponses in the // frontend which were sharded. Prefer to use the underlying stream hash when available, // which is needed in deduping code, but defer to label sorting when it's not present. if i.is[k].StreamHash() == 0 { return i.is[k].Labels() < i.is[j].Labels() } return i.is[k].StreamHash() < i.is[j].StreamHash() } if i.byAscendingTime { return t1 < t2 } return t1 > t2 } func (i *entrySortIterator) lessByValue(t1 int64, l1 uint64, lb string, index int) bool { t2 := i.is[index].Entry().Timestamp.UnixNano() if t1 == t2 { if l1 == 0 { return lb < i.is[index].Labels() } return l1 < i.is[index].StreamHash() } if i.byAscendingTime { return t1 < t2 } return t1 > t2 } // init throws out empty iterators and sorts them. func (i *entrySortIterator) init() { if i.prefetched { return } i.prefetched = true tmp := make([]EntryIterator, 0, len(i.is)) for _, it := range i.is { if it.Next() { tmp = append(tmp, it) continue } if err := it.Error(); err != nil { i.errs = append(i.errs, err) } util.LogError("closing iterator", it.Close) } i.is = tmp sort.Slice(i.is, i.lessByIndex) } func (i *entrySortIterator) fix() { head := i.is[0] t1 := head.Entry().Timestamp.UnixNano() l1 := head.StreamHash() lb := head.Labels() // shortcut if len(i.is) <= 1 || i.lessByValue(t1, l1, lb, 1) { return } // First element is out of place. So we reposition it. i.is = i.is[1:] // drop head index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, lb, in) }) if index == len(i.is) { i.is = append(i.is, head) } else { i.is = append(i.is[:index+1], i.is[index:]...) i.is[index] = head } } func (i *entrySortIterator) Next() bool { i.init() if len(i.is) == 0 { return false } next := i.is[0] i.currEntry.Entry = next.Entry() i.currEntry.labels = next.Labels() i.currEntry.streamHash = next.StreamHash() // if the top iterator is empty, we remove it. if !next.Next() { i.is = i.is[1:] if err := next.Error(); err != nil { i.errs = append(i.errs, err) } util.LogError("closing iterator", next.Close) return true } if len(i.is) > 1 { i.fix() } return true } func (i *entrySortIterator) Entry() logproto.Entry { return i.currEntry.Entry } func (i *entrySortIterator) Labels() string { return i.currEntry.labels } func (i *entrySortIterator) StreamHash() uint64 { return i.currEntry.streamHash } func (i *entrySortIterator) Error() error { switch len(i.errs) { case 0: return nil case 1: return i.errs[0] default: return util.MultiError(i.errs) } } func (i *entrySortIterator) Close() error { for _, entryIterator := range i.is { if err := entryIterator.Close(); err != nil { return err } } return nil } // NewStreamsIterator returns an iterator over logproto.Stream func NewStreamsIterator(streams []logproto.Stream, direction logproto.Direction) EntryIterator { is := make([]EntryIterator, 0, len(streams)) for i := range streams { is = append(is, NewStreamIterator(streams[i])) } return NewSortEntryIterator(is, direction) } // NewQueryResponseIterator returns an iterator over a QueryResponse. func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator { return NewStreamsIterator(resp.Streams, direction) } type queryClientIterator struct { client logproto.Querier_QueryClient direction logproto.Direction err error curr EntryIterator } // NewQueryClientIterator returns an iterator over a QueryClient. func NewQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator { return &queryClientIterator{ client: client, direction: direction, } } func (i *queryClientIterator) Next() bool { ctx := i.client.Context() for i.curr == nil || !i.curr.Next() { batch, err := i.client.Recv() if err == io.EOF { return false } else if err != nil { i.err = err return false } stats.JoinIngesters(ctx, batch.Stats) i.curr = NewQueryResponseIterator(batch, i.direction) } return true } func (i *queryClientIterator) Entry() logproto.Entry { return i.curr.Entry() } func (i *queryClientIterator) Labels() string { return i.curr.Labels() } func (i *queryClientIterator) StreamHash() uint64 { return i.curr.StreamHash() } func (i *queryClientIterator) Error() error { return i.err } func (i *queryClientIterator) Close() error { return i.client.CloseSend() } type nonOverlappingIterator struct { iterators []EntryIterator curr EntryIterator } // NewNonOverlappingIterator gives a chained iterator over a list of iterators. func NewNonOverlappingIterator(iterators []EntryIterator) EntryIterator { return &nonOverlappingIterator{ iterators: iterators, } } func (i *nonOverlappingIterator) Next() bool { for i.curr == nil || !i.curr.Next() { if len(i.iterators) == 0 { if i.curr != nil { i.curr.Close() } return false } if i.curr != nil { i.curr.Close() } i.curr, i.iterators = i.iterators[0], i.iterators[1:] } return true } func (i *nonOverlappingIterator) Entry() logproto.Entry { return i.curr.Entry() } func (i *nonOverlappingIterator) Labels() string { if i.curr == nil { return "" } return i.curr.Labels() } func (i *nonOverlappingIterator) StreamHash() uint64 { if i.curr == nil { return 0 } return i.curr.StreamHash() } func (i *nonOverlappingIterator) Error() error { if i.curr == nil { return nil } return i.curr.Error() } func (i *nonOverlappingIterator) Close() error { if i.curr != nil { i.curr.Close() } for _, iter := range i.iterators { iter.Close() } i.iterators = nil return nil } type timeRangedIterator struct { EntryIterator mint, maxt time.Time } // NewTimeRangedIterator returns an iterator which filters entries by time range. // Note: Only works with iterators that go forwards. func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator { return &timeRangedIterator{ EntryIterator: it, mint: mint, maxt: maxt, } } func (i *timeRangedIterator) Next() bool { ok := i.EntryIterator.Next() if !ok { i.EntryIterator.Close() return ok } ts := i.EntryIterator.Entry().Timestamp for ok && i.mint.After(ts) { ok = i.EntryIterator.Next() if !ok { continue } ts = i.EntryIterator.Entry().Timestamp } if ok { if ts.Equal(i.mint) { // The mint is inclusive return true } if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive. ok = false } } if !ok { i.EntryIterator.Close() } return ok } type entryWithLabels struct { logproto.Entry labels string streamHash uint64 } type reverseIterator struct { iter EntryIterator cur entryWithLabels entriesWithLabels []entryWithLabels loaded bool limit uint32 } // NewReversedIter returns an iterator which loads all or up to N entries // of an existing iterator, and then iterates over them backward. // Preload entries when they are being queried with a timeout. func NewReversedIter(it EntryIterator, limit uint32, preload bool) (EntryIterator, error) { iter, err := &reverseIterator{ iter: it, entriesWithLabels: make([]entryWithLabels, 0, 1024), limit: limit, }, it.Error() if err != nil { return nil, err } if preload { iter.load() } return iter, nil } func (i *reverseIterator) load() { if !i.loaded { i.loaded = true for count := uint32(0); (i.limit == 0 || count < i.limit) && i.iter.Next(); count++ { i.entriesWithLabels = append(i.entriesWithLabels, entryWithLabels{i.iter.Entry(), i.iter.Labels(), i.iter.StreamHash()}) } i.iter.Close() } } func (i *reverseIterator) Next() bool { i.load() if len(i.entriesWithLabels) == 0 { i.entriesWithLabels = nil return false } i.cur, i.entriesWithLabels = i.entriesWithLabels[len(i.entriesWithLabels)-1], i.entriesWithLabels[:len(i.entriesWithLabels)-1] return true } func (i *reverseIterator) Entry() logproto.Entry { return i.cur.Entry } func (i *reverseIterator) Labels() string { return i.cur.labels } func (i *reverseIterator) StreamHash() uint64 { return i.cur.streamHash } func (i *reverseIterator) Error() error { return nil } func (i *reverseIterator) Close() error { if !i.loaded { return i.iter.Close() } return nil } var entryBufferPool = sync.Pool{ New: func() interface{} { return &entryBuffer{ entries: make([]entryWithLabels, 0, 1024), } }, } type entryBuffer struct { entries []entryWithLabels } type reverseEntryIterator struct { iter EntryIterator cur entryWithLabels buf *entryBuffer loaded bool } // NewEntryReversedIter returns an iterator which loads all entries and iterates backward. // The labels of entries is always empty. func NewEntryReversedIter(it EntryIterator) (EntryIterator, error) { iter, err := &reverseEntryIterator{ iter: it, buf: entryBufferPool.Get().(*entryBuffer), }, it.Error() if err != nil { return nil, err } return iter, nil } func (i *reverseEntryIterator) load() { if !i.loaded { i.loaded = true for i.iter.Next() { i.buf.entries = append(i.buf.entries, entryWithLabels{i.iter.Entry(), i.iter.Labels(), i.iter.StreamHash()}) } i.iter.Close() } } func (i *reverseEntryIterator) Next() bool { i.load() if i.buf == nil || len(i.buf.entries) == 0 { i.release() return false } i.cur, i.buf.entries = i.buf.entries[len(i.buf.entries)-1], i.buf.entries[:len(i.buf.entries)-1] return true } func (i *reverseEntryIterator) Entry() logproto.Entry { return i.cur.Entry } func (i *reverseEntryIterator) Labels() string { return i.cur.labels } func (i *reverseEntryIterator) StreamHash() uint64 { return i.cur.streamHash } func (i *reverseEntryIterator) Error() error { return nil } func (i *reverseEntryIterator) release() { if i.buf == nil { return } if i.buf.entries != nil { // preserve the underlying slice before releasing to pool i.buf.entries = i.buf.entries[:0] } entryBufferPool.Put(i.buf) i.buf = nil } func (i *reverseEntryIterator) Close() error { i.release() if !i.loaded { return i.iter.Close() } return nil } // ReadBatch reads a set of entries off an iterator. func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) { var ( streams = map[uint64]map[string]*logproto.Stream{} respSize uint32 streamsCount int ) for ; respSize < size && i.Next(); respSize++ { labels, hash, entry := i.Labels(), i.StreamHash(), i.Entry() mutatedStreams, ok := streams[hash] if !ok { mutatedStreams = map[string]*logproto.Stream{} streams[hash] = mutatedStreams } mutatedStream, ok := mutatedStreams[labels] if !ok { streamsCount++ mutatedStream = &logproto.Stream{ Labels: labels, Hash: hash, } mutatedStreams[labels] = mutatedStream } mutatedStream.Entries = append(mutatedStream.Entries, entry) } result := logproto.QueryResponse{ Streams: make([]logproto.Stream, 0, streamsCount), } for _, mutatedStreams := range streams { for _, s := range mutatedStreams { result.Streams = append(result.Streams, *s) } } return &result, respSize, i.Error() } type peekingEntryIterator struct { iter EntryIterator cache *entryWithLabels next *entryWithLabels } // PeekingEntryIterator is an entry iterator that can look ahead an entry // using `Peek` without advancing its cursor. type PeekingEntryIterator interface { EntryIterator Peek() (string, logproto.Entry, bool) } // NewPeekingIterator creates a new peeking iterator. func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator { // initialize the next entry so we can peek right from the start. var cache *entryWithLabels next := &entryWithLabels{} if iter.Next() { cache = &entryWithLabels{ Entry: iter.Entry(), labels: iter.Labels(), streamHash: iter.StreamHash(), } next.Entry = cache.Entry next.labels = cache.labels } return &peekingEntryIterator{ iter: iter, cache: cache, next: next, } } // Next implements `EntryIterator` func (it *peekingEntryIterator) Next() bool { if it.cache != nil { it.next.Entry = it.cache.Entry it.next.labels = it.cache.labels it.next.streamHash = it.cache.streamHash it.cacheNext() return true } return false } // cacheNext caches the next element if it exists. func (it *peekingEntryIterator) cacheNext() { if it.iter.Next() { it.cache.Entry = it.iter.Entry() it.cache.labels = it.iter.Labels() it.cache.streamHash = it.iter.StreamHash() return } // nothing left removes the cached entry it.cache = nil } // Peek implements `PeekingEntryIterator` func (it *peekingEntryIterator) Peek() (string, logproto.Entry, bool) { if it.cache != nil { return it.cache.labels, it.cache.Entry, true } return "", logproto.Entry{}, false } // Labels implements `EntryIterator` func (it *peekingEntryIterator) Labels() string { if it.next != nil { return it.next.labels } return "" } func (it *peekingEntryIterator) StreamHash() uint64 { if it.next != nil { return it.next.streamHash } return 0 } // Entry implements `EntryIterator` func (it *peekingEntryIterator) Entry() logproto.Entry { if it.next != nil { return it.next.Entry } return logproto.Entry{} } // Error implements `EntryIterator` func (it *peekingEntryIterator) Error() error { return it.iter.Error() } // Close implements `EntryIterator` func (it *peekingEntryIterator) Close() error { return it.iter.Close() }