mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
267 lines
5.0 KiB
267 lines
5.0 KiB
|
8 years ago
|
package iter
|
||
|
8 years ago
|
|
||
|
|
import (
|
||
|
|
"container/heap"
|
||
|
8 years ago
|
"fmt"
|
||
|
8 years ago
|
"io"
|
||
|
8 years ago
|
"regexp"
|
||
|
8 years ago
|
|
||
|
|
"github.com/grafana/logish/pkg/logproto"
|
||
|
|
)
|
||
|
|
|
||
|
|
// EntryIterator iterates over entries in time-order.
|
||
|
|
type EntryIterator interface {
|
||
|
|
Next() bool
|
||
|
|
Entry() logproto.Entry
|
||
|
|
Labels() string
|
||
|
|
Error() error
|
||
|
|
Close() error
|
||
|
|
}
|
||
|
|
|
||
|
|
// streamIterator iterates over entries in a stream.
|
||
|
|
type streamIterator struct {
|
||
|
|
i int
|
||
|
|
entries []logproto.Entry
|
||
|
|
labels string
|
||
|
|
}
|
||
|
|
|
||
|
|
func newStreamIterator(stream *logproto.Stream) EntryIterator {
|
||
|
|
return &streamIterator{
|
||
|
|
i: -1,
|
||
|
|
entries: stream.Entries,
|
||
|
|
labels: stream.Labels,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *streamIterator) Next() bool {
|
||
|
8 years ago
|
i.i++
|
||
|
8 years ago
|
return i.i < len(i.entries)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *streamIterator) Error() error {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *streamIterator) Labels() string {
|
||
|
|
return i.labels
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *streamIterator) Entry() logproto.Entry {
|
||
|
|
return i.entries[i.i]
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *streamIterator) Close() error {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
type iteratorHeap []EntryIterator
|
||
|
|
|
||
|
8 years ago
|
func (h iteratorHeap) Len() int { return len(h) }
|
||
|
|
func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||
|
|
func (h iteratorHeap) Peek() EntryIterator { return h[0] }
|
||
|
8 years ago
|
func (h *iteratorHeap) Push(x interface{}) {
|
||
|
|
*h = append(*h, x.(EntryIterator))
|
||
|
|
}
|
||
|
|
|
||
|
|
func (h *iteratorHeap) Pop() interface{} {
|
||
|
|
old := *h
|
||
|
|
n := len(old)
|
||
|
|
x := old[n-1]
|
||
|
|
*h = old[0 : n-1]
|
||
|
|
return x
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
type iteratorMinHeap struct {
|
||
|
|
iteratorHeap
|
||
|
|
}
|
||
|
|
|
||
|
|
func (h iteratorMinHeap) Less(i, j int) bool {
|
||
|
|
return h.iteratorHeap[i].Entry().Timestamp.Before(h.iteratorHeap[j].Entry().Timestamp)
|
||
|
|
}
|
||
|
|
|
||
|
|
type iteratorMaxHeap struct {
|
||
|
|
iteratorHeap
|
||
|
|
}
|
||
|
|
|
||
|
|
func (h iteratorMaxHeap) Less(i, j int) bool {
|
||
|
|
return h.iteratorHeap[i].Entry().Timestamp.After(h.iteratorHeap[j].Entry().Timestamp)
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
// heapIterator iterates over a heap of iterators.
|
||
|
|
type heapIterator struct {
|
||
|
8 years ago
|
heap interface {
|
||
|
|
heap.Interface
|
||
|
|
Peek() EntryIterator
|
||
|
|
}
|
||
|
8 years ago
|
curr EntryIterator
|
||
|
|
errs []error
|
||
|
8 years ago
|
}
|
||
|
|
|
||
|
8 years ago
|
func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIterator {
|
||
|
|
result := &heapIterator{}
|
||
|
|
iterators := make(iteratorHeap, 0, len(is))
|
||
|
8 years ago
|
// pre-next each iterator, drop empty.
|
||
|
|
for _, i := range is {
|
||
|
|
if i.Next() {
|
||
|
8 years ago
|
iterators = append(iterators, i)
|
||
|
8 years ago
|
} else {
|
||
|
8 years ago
|
result.recordError(i)
|
||
|
8 years ago
|
i.Close()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
switch direction {
|
||
|
|
case logproto.BACKWARD:
|
||
|
|
result.heap = &iteratorMaxHeap{iterators}
|
||
|
|
case logproto.FORWARD:
|
||
|
|
result.heap = &iteratorMinHeap{iterators}
|
||
|
|
default:
|
||
|
|
panic("bad direction")
|
||
|
|
}
|
||
|
8 years ago
|
return result
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
func (i *heapIterator) recordError(ei EntryIterator) {
|
||
|
|
err := ei.Error()
|
||
|
|
if err != nil {
|
||
|
|
i.errs = append(i.errs, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
func (i *heapIterator) Next() bool {
|
||
|
|
if i.curr != nil {
|
||
|
|
if i.curr.Next() {
|
||
|
8 years ago
|
heap.Push(i.heap, i.curr)
|
||
|
8 years ago
|
} else {
|
||
|
8 years ago
|
i.recordError(i.curr)
|
||
|
8 years ago
|
i.curr.Close()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
if i.heap.Len() == 0 {
|
||
|
8 years ago
|
return false
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
i.curr = heap.Pop(i.heap).(EntryIterator)
|
||
|
8 years ago
|
|
||
|
|
// keep popping entries off if they match, to dedupe
|
||
|
8 years ago
|
curr := i.curr.Entry()
|
||
|
8 years ago
|
for i.heap.Len() > 0 {
|
||
|
|
next := i.heap.Peek().Entry()
|
||
|
8 years ago
|
if curr.Equal(next) {
|
||
|
|
i.heap.Pop()
|
||
|
|
} else {
|
||
|
8 years ago
|
break
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
return true
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *heapIterator) Entry() logproto.Entry {
|
||
|
|
return i.curr.Entry()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *heapIterator) Labels() string {
|
||
|
|
return i.curr.Labels()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *heapIterator) Error() error {
|
||
|
8 years ago
|
switch len(i.errs) {
|
||
|
|
case 0:
|
||
|
|
return nil
|
||
|
|
case 1:
|
||
|
|
return i.errs[0]
|
||
|
|
default:
|
||
|
|
return fmt.Errorf("Multiple errors: %+v", i.errs)
|
||
|
8 years ago
|
}
|
||
|
8 years ago
|
}
|
||
|
|
|
||
|
|
func (i *heapIterator) Close() error {
|
||
|
8 years ago
|
for i.heap.Len() > 0 {
|
||
|
|
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
|
||
|
8 years ago
|
return err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator {
|
||
|
8 years ago
|
is := make([]EntryIterator, 0, len(resp.Streams))
|
||
|
8 years ago
|
for i := range resp.Streams {
|
||
|
|
is = append(is, newStreamIterator(resp.Streams[i]))
|
||
|
|
}
|
||
|
8 years ago
|
return NewHeapIterator(is, direction)
|
||
|
8 years ago
|
}
|
||
|
|
|
||
|
|
type queryClientIterator struct {
|
||
|
8 years ago
|
client logproto.Querier_QueryClient
|
||
|
|
direction logproto.Direction
|
||
|
|
err error
|
||
|
|
curr EntryIterator
|
||
|
8 years ago
|
}
|
||
|
|
|
||
|
8 years ago
|
func NewQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator {
|
||
|
8 years ago
|
return &queryClientIterator{
|
||
|
8 years ago
|
client: client,
|
||
|
|
direction: direction,
|
||
|
8 years ago
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *queryClientIterator) Next() bool {
|
||
|
|
for i.curr == nil || !i.curr.Next() {
|
||
|
|
batch, err := i.client.Recv()
|
||
|
|
if err == io.EOF {
|
||
|
|
return false
|
||
|
|
} else if err != nil {
|
||
|
|
i.err = err
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
|
||
|
8 years ago
|
i.curr = NewQueryResponseIterator(batch, i.direction)
|
||
|
8 years ago
|
}
|
||
|
|
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *queryClientIterator) Entry() logproto.Entry {
|
||
|
|
return i.curr.Entry()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *queryClientIterator) Labels() string {
|
||
|
|
return i.curr.Labels()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *queryClientIterator) Error() error {
|
||
|
|
return i.err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *queryClientIterator) Close() error {
|
||
|
|
return i.client.CloseSend()
|
||
|
|
}
|
||
|
8 years ago
|
|
||
|
|
type regexpFilter struct {
|
||
|
|
re *regexp.Regexp
|
||
|
|
EntryIterator
|
||
|
|
}
|
||
|
|
|
||
|
|
func NewRegexpFilter(r string, i EntryIterator) (EntryIterator, error) {
|
||
|
|
re, err := regexp.Compile(r)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return ®expFilter{
|
||
|
|
re: re,
|
||
|
|
EntryIterator: i,
|
||
|
|
}, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (i *regexpFilter) Next() bool {
|
||
|
|
for i.EntryIterator.Next() {
|
||
|
|
if i.re.MatchString(i.Entry().Line) {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return false
|
||
|
|
}
|