mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
676 lines
16 KiB
676 lines
16 KiB
package iter
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/grafana/loki/pkg/helpers"
|
|
"github.com/grafana/loki/pkg/logproto"
|
|
)
|
|
|
|
// EntryIterator iterates over entries in time-order.
|
|
type EntryIterator interface {
|
|
Next() bool
|
|
Entry() logproto.Entry
|
|
Labels() string
|
|
Error() error
|
|
Close() error
|
|
}
|
|
|
|
// streamIterator iterates over entries in a stream.
|
|
type streamIterator struct {
|
|
i int
|
|
entries []logproto.Entry
|
|
labels string
|
|
}
|
|
|
|
// NewStreamIterator iterates over entries in a stream.
|
|
func NewStreamIterator(stream *logproto.Stream) EntryIterator {
|
|
return &streamIterator{
|
|
i: -1,
|
|
entries: stream.Entries,
|
|
labels: stream.Labels,
|
|
}
|
|
}
|
|
|
|
func (i *streamIterator) Next() bool {
|
|
i.i++
|
|
return i.i < len(i.entries)
|
|
}
|
|
|
|
func (i *streamIterator) Error() error {
|
|
return nil
|
|
}
|
|
|
|
func (i *streamIterator) Labels() string {
|
|
return i.labels
|
|
}
|
|
|
|
func (i *streamIterator) Entry() logproto.Entry {
|
|
return i.entries[i.i]
|
|
}
|
|
|
|
func (i *streamIterator) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type iteratorHeap []EntryIterator
|
|
|
|
func (h iteratorHeap) Len() int { return len(h) }
|
|
func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
func (h iteratorHeap) Peek() EntryIterator { return h[0] }
|
|
func (h *iteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(EntryIterator))
|
|
}
|
|
|
|
func (h *iteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
type iteratorMinHeap struct {
|
|
iteratorHeap
|
|
}
|
|
|
|
func (h iteratorMinHeap) Less(i, j int) bool {
|
|
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
|
|
if !t1.Equal(t2) {
|
|
return t1.Before(t2)
|
|
}
|
|
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
|
|
}
|
|
|
|
type iteratorMaxHeap struct {
|
|
iteratorHeap
|
|
}
|
|
|
|
func (h iteratorMaxHeap) Less(i, j int) bool {
|
|
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
|
|
if !t1.Equal(t2) {
|
|
return t1.After(t2)
|
|
}
|
|
return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels()
|
|
}
|
|
|
|
// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len
|
|
// Not safe for concurrent use
|
|
type HeapIterator interface {
|
|
EntryIterator
|
|
Peek() time.Time
|
|
Len() int
|
|
Push(EntryIterator)
|
|
}
|
|
|
|
// heapIterator iterates over a heap of iterators.
|
|
type heapIterator struct {
|
|
heap interface {
|
|
heap.Interface
|
|
Peek() EntryIterator
|
|
}
|
|
is []EntryIterator
|
|
prefetched bool
|
|
|
|
tuples tuples
|
|
currEntry logproto.Entry
|
|
currLabels string
|
|
errs []error
|
|
}
|
|
|
|
// NewHeapIterator returns a new iterator which uses a heap to merge together
|
|
// entries for multiple interators.
|
|
func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapIterator {
|
|
result := &heapIterator{is: is}
|
|
switch direction {
|
|
case logproto.BACKWARD:
|
|
result.heap = &iteratorMaxHeap{}
|
|
case logproto.FORWARD:
|
|
result.heap = &iteratorMinHeap{}
|
|
default:
|
|
panic("bad direction")
|
|
}
|
|
|
|
result.tuples = make([]tuple, 0, len(is))
|
|
return result
|
|
}
|
|
|
|
// prefetch iterates over all inner iterators to merge together, calls Next() on
|
|
// each of them to prefetch the first entry and pushes of them - who are not
|
|
// empty - to the heap
|
|
func (i *heapIterator) prefetch() {
|
|
if i.prefetched {
|
|
return
|
|
}
|
|
|
|
i.prefetched = true
|
|
for _, it := range i.is {
|
|
i.requeue(it, false)
|
|
}
|
|
|
|
// We can now clear the list of input iterators to merge, given they have all
|
|
// been processed and the non empty ones have been pushed to the heap
|
|
i.is = nil
|
|
}
|
|
|
|
// requeue pushes the input ei EntryIterator to the heap, advancing it via an ei.Next()
|
|
// call unless the advanced input parameter is true. In this latter case it expects that
|
|
// the iterator has already been advanced before calling requeue().
|
|
//
|
|
// If the iterator has no more entries or an error occur while advancing it, the iterator
|
|
// is not pushed to the heap and any possible error captured, so that can be get via Error().
|
|
func (i *heapIterator) requeue(ei EntryIterator, advanced bool) {
|
|
if advanced || ei.Next() {
|
|
heap.Push(i.heap, ei)
|
|
return
|
|
}
|
|
|
|
if err := ei.Error(); err != nil {
|
|
i.errs = append(i.errs, err)
|
|
}
|
|
helpers.LogError("closing iterator", ei.Close)
|
|
}
|
|
|
|
func (i *heapIterator) Push(ei EntryIterator) {
|
|
i.requeue(ei, false)
|
|
}
|
|
|
|
type tuple struct {
|
|
logproto.Entry
|
|
EntryIterator
|
|
}
|
|
|
|
type tuples []tuple
|
|
|
|
func (t tuples) Len() int { return len(t) }
|
|
func (t tuples) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
|
func (t tuples) Less(i, j int) bool { return t[i].Line < t[j].Line }
|
|
|
|
func (i *heapIterator) Next() bool {
|
|
i.prefetch()
|
|
|
|
if i.heap.Len() == 0 {
|
|
return false
|
|
}
|
|
|
|
// We support multiple entries with the same timestamp, and we want to
|
|
// preserve their original order. We look at all the top entries in the
|
|
// heap with the same timestamp, and pop the ones whose common value
|
|
// occurs most often.
|
|
for i.heap.Len() > 0 {
|
|
next := i.heap.Peek()
|
|
entry := next.Entry()
|
|
if len(i.tuples) > 0 && (i.tuples[0].Labels() != next.Labels() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) {
|
|
break
|
|
}
|
|
|
|
heap.Pop(i.heap)
|
|
i.tuples = append(i.tuples, tuple{
|
|
Entry: entry,
|
|
EntryIterator: next,
|
|
})
|
|
}
|
|
|
|
// Find in tuples which entry occurs most often which, due to quorum based
|
|
// replication, is guaranteed to be the correct next entry.
|
|
t := mostCommon(i.tuples)
|
|
i.currEntry = t.Entry
|
|
i.currLabels = t.Labels()
|
|
|
|
// Requeue the iterators, advancing them if they were consumed.
|
|
for j := range i.tuples {
|
|
i.requeue(i.tuples[j].EntryIterator, i.tuples[j].Line != i.currEntry.Line)
|
|
}
|
|
i.tuples = i.tuples[:0]
|
|
return true
|
|
}
|
|
|
|
func mostCommon(tuples tuples) tuple {
|
|
sort.Sort(tuples)
|
|
result := tuples[0]
|
|
count, max := 0, 0
|
|
for i := 0; i < len(tuples)-1; i++ {
|
|
if tuples[i].Line == tuples[i+1].Line {
|
|
count++
|
|
continue
|
|
}
|
|
if count > max {
|
|
result = tuples[i]
|
|
max = count
|
|
}
|
|
count = 0
|
|
}
|
|
if count > max {
|
|
result = tuples[len(tuples)-1]
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (i *heapIterator) Entry() logproto.Entry {
|
|
return i.currEntry
|
|
}
|
|
|
|
func (i *heapIterator) Labels() string {
|
|
return i.currLabels
|
|
}
|
|
|
|
func (i *heapIterator) Error() error {
|
|
switch len(i.errs) {
|
|
case 0:
|
|
return nil
|
|
case 1:
|
|
return i.errs[0]
|
|
default:
|
|
return fmt.Errorf("Multiple errors: %+v", i.errs)
|
|
}
|
|
}
|
|
|
|
func (i *heapIterator) Close() error {
|
|
for i.heap.Len() > 0 {
|
|
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
i.tuples = nil
|
|
return nil
|
|
}
|
|
|
|
func (i *heapIterator) Peek() time.Time {
|
|
i.prefetch()
|
|
|
|
return i.heap.Peek().Entry().Timestamp
|
|
}
|
|
|
|
// Len returns the number of inner iterators on the heap, still having entries
|
|
func (i *heapIterator) Len() int {
|
|
i.prefetch()
|
|
|
|
return i.heap.Len()
|
|
}
|
|
|
|
// NewStreamsIterator returns an iterator over logproto.Stream
|
|
func NewStreamsIterator(streams []*logproto.Stream, direction logproto.Direction) EntryIterator {
|
|
is := make([]EntryIterator, 0, len(streams))
|
|
for i := range streams {
|
|
is = append(is, NewStreamIterator(streams[i]))
|
|
}
|
|
return NewHeapIterator(is, direction)
|
|
}
|
|
|
|
// NewQueryResponseIterator returns an iterator over a QueryResponse.
|
|
func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator {
|
|
is := make([]EntryIterator, 0, len(resp.Streams))
|
|
for i := range resp.Streams {
|
|
is = append(is, NewStreamIterator(resp.Streams[i]))
|
|
}
|
|
return NewHeapIterator(is, direction)
|
|
}
|
|
|
|
type queryClientIterator struct {
|
|
client logproto.Querier_QueryClient
|
|
direction logproto.Direction
|
|
err error
|
|
curr EntryIterator
|
|
}
|
|
|
|
// NewQueryClientIterator returns an iterator over a QueryClient.
|
|
func NewQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator {
|
|
return &queryClientIterator{
|
|
client: client,
|
|
direction: direction,
|
|
}
|
|
}
|
|
|
|
func (i *queryClientIterator) Next() bool {
|
|
for i.curr == nil || !i.curr.Next() {
|
|
batch, err := i.client.Recv()
|
|
if err == io.EOF {
|
|
return false
|
|
} else if err != nil {
|
|
i.err = err
|
|
return false
|
|
}
|
|
|
|
i.curr = NewQueryResponseIterator(batch, i.direction)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (i *queryClientIterator) Entry() logproto.Entry {
|
|
return i.curr.Entry()
|
|
}
|
|
|
|
func (i *queryClientIterator) Labels() string {
|
|
return i.curr.Labels()
|
|
}
|
|
|
|
func (i *queryClientIterator) Error() error {
|
|
return i.err
|
|
}
|
|
|
|
func (i *queryClientIterator) Close() error {
|
|
return i.client.CloseSend()
|
|
}
|
|
|
|
type nonOverlappingIterator struct {
|
|
labels string
|
|
i int
|
|
iterators []EntryIterator
|
|
curr EntryIterator
|
|
}
|
|
|
|
// NewNonOverlappingIterator gives a chained iterator over a list of iterators.
|
|
func NewNonOverlappingIterator(iterators []EntryIterator, labels string) EntryIterator {
|
|
return &nonOverlappingIterator{
|
|
labels: labels,
|
|
iterators: iterators,
|
|
}
|
|
}
|
|
|
|
func (i *nonOverlappingIterator) Next() bool {
|
|
for i.curr == nil || !i.curr.Next() {
|
|
if len(i.iterators) == 0 {
|
|
if i.curr != nil {
|
|
i.curr.Close()
|
|
}
|
|
return false
|
|
}
|
|
if i.curr != nil {
|
|
i.curr.Close()
|
|
}
|
|
i.i++
|
|
i.curr, i.iterators = i.iterators[0], i.iterators[1:]
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (i *nonOverlappingIterator) Entry() logproto.Entry {
|
|
return i.curr.Entry()
|
|
}
|
|
|
|
func (i *nonOverlappingIterator) Labels() string {
|
|
if i.labels != "" {
|
|
return i.labels
|
|
}
|
|
|
|
return i.curr.Labels()
|
|
}
|
|
|
|
func (i *nonOverlappingIterator) Error() error {
|
|
if i.curr != nil {
|
|
return i.curr.Error()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i *nonOverlappingIterator) Close() error {
|
|
for _, iter := range i.iterators {
|
|
iter.Close()
|
|
}
|
|
i.iterators = nil
|
|
return nil
|
|
}
|
|
|
|
type timeRangedIterator struct {
|
|
EntryIterator
|
|
mint, maxt time.Time
|
|
}
|
|
|
|
// NewTimeRangedIterator returns an iterator which filters entries by time range.
|
|
func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator {
|
|
return &timeRangedIterator{
|
|
EntryIterator: it,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
}
|
|
}
|
|
|
|
func (i *timeRangedIterator) Next() bool {
|
|
ok := i.EntryIterator.Next()
|
|
if !ok {
|
|
i.EntryIterator.Close()
|
|
return ok
|
|
}
|
|
ts := i.EntryIterator.Entry().Timestamp
|
|
for ok && i.mint.After(ts) {
|
|
ok = i.EntryIterator.Next()
|
|
ts = i.EntryIterator.Entry().Timestamp
|
|
}
|
|
|
|
if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive.
|
|
ok = false
|
|
}
|
|
if !ok {
|
|
i.EntryIterator.Close()
|
|
}
|
|
return ok
|
|
}
|
|
|
|
type entryIteratorBackward struct {
|
|
forwardIter EntryIterator
|
|
cur logproto.Entry
|
|
entries []logproto.Entry
|
|
loaded bool
|
|
}
|
|
|
|
// NewEntryIteratorBackward returns an iterator which loads all the entries
|
|
// of an existing iterator, and then iterates over them backward.
|
|
func NewEntryIteratorBackward(it EntryIterator) (EntryIterator, error) {
|
|
return &entryIteratorBackward{entries: make([]logproto.Entry, 0, 1024), forwardIter: it}, it.Error()
|
|
}
|
|
|
|
func (i *entryIteratorBackward) load() {
|
|
if !i.loaded {
|
|
i.loaded = true
|
|
for i.forwardIter.Next() {
|
|
entry := i.forwardIter.Entry()
|
|
i.entries = append(i.entries, entry)
|
|
}
|
|
i.forwardIter.Close()
|
|
}
|
|
}
|
|
|
|
func (i *entryIteratorBackward) Next() bool {
|
|
i.load()
|
|
if len(i.entries) == 0 {
|
|
i.entries = nil
|
|
return false
|
|
}
|
|
i.cur, i.entries = i.entries[len(i.entries)-1], i.entries[:len(i.entries)-1]
|
|
return true
|
|
}
|
|
|
|
func (i *entryIteratorBackward) Entry() logproto.Entry {
|
|
return i.cur
|
|
}
|
|
|
|
func (i *entryIteratorBackward) Close() error { return nil }
|
|
|
|
func (i *entryIteratorBackward) Error() error { return nil }
|
|
|
|
func (i *entryIteratorBackward) Labels() string {
|
|
return ""
|
|
}
|
|
|
|
// ReadBatch reads a set of entries off an iterator.
|
|
func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) {
|
|
streams := map[string]*logproto.Stream{}
|
|
respSize := uint32(0)
|
|
for ; respSize < size && i.Next(); respSize++ {
|
|
labels, entry := i.Labels(), i.Entry()
|
|
stream, ok := streams[labels]
|
|
if !ok {
|
|
stream = &logproto.Stream{
|
|
Labels: labels,
|
|
}
|
|
streams[labels] = stream
|
|
}
|
|
stream.Entries = append(stream.Entries, entry)
|
|
}
|
|
|
|
result := logproto.QueryResponse{
|
|
Streams: make([]*logproto.Stream, 0, len(streams)),
|
|
}
|
|
for _, stream := range streams {
|
|
result.Streams = append(result.Streams, stream)
|
|
}
|
|
return &result, respSize, i.Error()
|
|
}
|
|
|
|
type entryWithLabels struct {
|
|
entry logproto.Entry
|
|
labels string
|
|
}
|
|
|
|
type entryIteratorForward struct {
|
|
backwardIter EntryIterator
|
|
cur entryWithLabels
|
|
entriesWithLabels []entryWithLabels
|
|
loaded bool
|
|
limit uint32
|
|
}
|
|
|
|
// NewEntryIteratorBackward returns an iterator which loads all or upton N entries
|
|
// of an existing iterator, and then iterates over them backward.
|
|
// preload entries when they are being queried with a timeout
|
|
func NewEntryIteratorForward(it EntryIterator, limit uint32, preload bool) (EntryIterator, error) {
|
|
itr, err := &entryIteratorForward{entriesWithLabels: make([]entryWithLabels, 0, 1024), backwardIter: it, limit: limit}, it.Error()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if preload {
|
|
itr.load()
|
|
}
|
|
|
|
return itr, nil
|
|
}
|
|
|
|
func (i *entryIteratorForward) load() {
|
|
if !i.loaded {
|
|
i.loaded = true
|
|
for count := uint32(0); (i.limit == 0 || count < i.limit) && i.backwardIter.Next(); count++ {
|
|
i.entriesWithLabels = append(i.entriesWithLabels, entryWithLabels{i.backwardIter.Entry(), i.backwardIter.Labels()})
|
|
}
|
|
i.backwardIter.Close()
|
|
}
|
|
}
|
|
|
|
func (i *entryIteratorForward) Next() bool {
|
|
i.load()
|
|
if len(i.entriesWithLabels) == 0 {
|
|
i.entriesWithLabels = nil
|
|
return false
|
|
}
|
|
i.cur, i.entriesWithLabels = i.entriesWithLabels[len(i.entriesWithLabels)-1], i.entriesWithLabels[:len(i.entriesWithLabels)-1]
|
|
return true
|
|
}
|
|
|
|
func (i *entryIteratorForward) Entry() logproto.Entry {
|
|
return i.cur.entry
|
|
}
|
|
|
|
func (i *entryIteratorForward) Close() error { return nil }
|
|
|
|
func (i *entryIteratorForward) Error() error { return nil }
|
|
|
|
func (i *entryIteratorForward) Labels() string {
|
|
return i.cur.labels
|
|
}
|
|
|
|
type peekingEntryIterator struct {
|
|
iter EntryIterator
|
|
|
|
cache *entryWithLabels
|
|
next *entryWithLabels
|
|
}
|
|
|
|
// PeekingEntryIterator is an entry iterator that can look ahead an entry
|
|
// using `Peek` without advancing its cursor.
|
|
type PeekingEntryIterator interface {
|
|
EntryIterator
|
|
Peek() (string, logproto.Entry, bool)
|
|
}
|
|
|
|
// NewPeekingIterator creates a new peeking iterator.
|
|
func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator {
|
|
// initialize the next entry so we can peek right from the start.
|
|
var cache *entryWithLabels
|
|
if iter.Next() {
|
|
cache = &entryWithLabels{
|
|
entry: iter.Entry(),
|
|
labels: iter.Labels(),
|
|
}
|
|
}
|
|
return &peekingEntryIterator{
|
|
iter: iter,
|
|
cache: cache,
|
|
next: cache,
|
|
}
|
|
}
|
|
|
|
// Next implements `EntryIterator`
|
|
func (it *peekingEntryIterator) Next() bool {
|
|
if it.cache != nil {
|
|
it.next = &entryWithLabels{
|
|
entry: it.cache.entry,
|
|
labels: it.cache.labels,
|
|
}
|
|
it.cacheNext()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// cacheNext caches the next element if it exists.
|
|
func (it *peekingEntryIterator) cacheNext() {
|
|
if it.iter.Next() {
|
|
it.cache = &entryWithLabels{
|
|
entry: it.iter.Entry(),
|
|
labels: it.iter.Labels(),
|
|
}
|
|
return
|
|
}
|
|
// nothing left removes the cached entry
|
|
it.cache = nil
|
|
}
|
|
|
|
// Peek implements `PeekingEntryIterator`
|
|
func (it *peekingEntryIterator) Peek() (string, logproto.Entry, bool) {
|
|
if it.cache != nil {
|
|
return it.cache.labels, it.cache.entry, true
|
|
}
|
|
return "", logproto.Entry{}, false
|
|
}
|
|
|
|
// Labels implements `EntryIterator`
|
|
func (it *peekingEntryIterator) Labels() string {
|
|
if it.next != nil {
|
|
return it.next.labels
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// Entry implements `EntryIterator`
|
|
func (it *peekingEntryIterator) Entry() logproto.Entry {
|
|
if it.next != nil {
|
|
return it.next.entry
|
|
}
|
|
return logproto.Entry{}
|
|
}
|
|
|
|
// Error implements `EntryIterator`
|
|
func (it *peekingEntryIterator) Error() error {
|
|
return it.iter.Error()
|
|
}
|
|
|
|
// Close implements `EntryIterator`
|
|
func (it *peekingEntryIterator) Close() error {
|
|
return it.iter.Close()
|
|
}
|
|
|