Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/iter/entry_iterator.go

877 lines
20 KiB

package iter
import (
"container/heap"
"context"
"io"
"sync"
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
)
// EntryIterator iterates over entries in time-order.
type EntryIterator interface {
Iterator
Entry() logproto.Entry
}
// streamIterator iterates over entries in a stream.
type streamIterator struct {
i int
stream logproto.Stream
}
// NewStreamIterator iterates over entries in a stream.
func NewStreamIterator(stream logproto.Stream) EntryIterator {
return &streamIterator{
i: -1,
stream: stream,
}
}
func (i *streamIterator) Next() bool {
i.i++
return i.i < len(i.stream.Entries)
}
func (i *streamIterator) Error() error {
return nil
}
func (i *streamIterator) Labels() string {
return i.stream.Labels
}
func (i *streamIterator) StreamHash() uint64 { return i.stream.Hash }
func (i *streamIterator) Entry() logproto.Entry {
return i.stream.Entries[i.i]
}
func (i *streamIterator) Close() error {
return nil
}
type iteratorHeap []EntryIterator
func (h iteratorHeap) Len() int { return len(h) }
func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h iteratorHeap) Peek() EntryIterator { return h[0] }
func (h *iteratorHeap) Push(x interface{}) {
*h = append(*h, x.(EntryIterator))
}
func (h *iteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type iteratorSortHeap struct {
iteratorHeap
byAlphabetical bool
byAscendingTime bool
}
func (h iteratorSortHeap) Less(i, j int) bool {
t1, t2 := h.iteratorHeap[i].Entry().Timestamp.UnixNano(), h.iteratorHeap[j].Entry().Timestamp.UnixNano()
if t1 == t2 {
if h.byAlphabetical {
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
}
return h.iteratorHeap[i].StreamHash() < h.iteratorHeap[j].StreamHash()
}
if h.byAscendingTime {
return t1 < t2
}
return t1 > t2
}
// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len
// Not safe for concurrent use
type HeapIterator interface {
EntryIterator
Peek() time.Time
Len() int
Push(EntryIterator)
}
// mergeEntryIterator iterates over a heap of iterators and merge duplicate entries.
type mergeEntryIterator struct {
heap interface {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prefetched bool
stats *stats.Context
tuples []tuple
currEntry entryWithLabels
errs []error
}
// NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any.
// The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator.
// This means using this iterator with a single iterator will result in the same result as the input iterator.
// If you don't need to deduplicate entries, use `NewSortEntryIterator` instead.
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &mergeEntryIterator{is: is, stats: stats.FromContext(ctx)}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false}
case logproto.FORWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true}
default:
panic("bad direction")
}
result.tuples = make([]tuple, 0, len(is))
return result
}
// prefetch iterates over all inner iterators to merge together, calls Next() on
// each of them to prefetch the first entry and pushes of them - who are not
// empty - to the heap
func (i *mergeEntryIterator) prefetch() {
if i.prefetched {
return
}
i.prefetched = true
for _, it := range i.is {
i.requeue(it, false)
}
// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
}
// requeue pushes the input ei EntryIterator to the heap, advancing it via an ei.Next()
// call unless the advanced input parameter is true. In this latter case it expects that
// the iterator has already been advanced before calling requeue().
//
// If the iterator has no more entries or an error occur while advancing it, the iterator
// is not pushed to the heap and any possible error captured, so that can be get via Error().
func (i *mergeEntryIterator) requeue(ei EntryIterator, advanced bool) {
if advanced || ei.Next() {
heap.Push(i.heap, ei)
return
}
if err := ei.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", ei.Close)
}
func (i *mergeEntryIterator) Push(ei EntryIterator) {
i.requeue(ei, false)
}
type tuple struct {
logproto.Entry
EntryIterator
}
func (i *mergeEntryIterator) Next() bool {
i.prefetch()
if i.heap.Len() == 0 {
return false
}
// shortcut for the last iterator.
if i.heap.Len() == 1 {
i.currEntry.entry = i.heap.Peek().Entry()
i.currEntry.labels = i.heap.Peek().Labels()
i.currEntry.streamHash = i.heap.Peek().StreamHash()
if !i.heap.Peek().Next() {
i.heap.Pop()
}
return true
}
// We support multiple entries with the same timestamp, and we want to
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
// occurs most often.
for i.heap.Len() > 0 {
next := i.heap.Peek()
entry := next.Entry()
if len(i.tuples) > 0 && (i.tuples[0].StreamHash() != next.StreamHash() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) {
break
}
heap.Pop(i.heap)
i.tuples = append(i.tuples, tuple{
Entry: entry,
EntryIterator: next,
})
}
// shortcut if we have a single tuple.
if len(i.tuples) == 1 {
i.currEntry.entry = i.tuples[0].Entry
i.currEntry.labels = i.tuples[0].Labels()
i.currEntry.streamHash = i.tuples[0].StreamHash()
i.requeue(i.tuples[0].EntryIterator, false)
i.tuples = i.tuples[:0]
return true
}
// Find in tuples which entry occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
t := i.tuples[0]
i.currEntry.entry = t.Entry
i.currEntry.labels = t.Labels()
i.currEntry.streamHash = i.tuples[0].StreamHash()
// Requeue the iterators, advancing them if they were consumed.
for j := range i.tuples {
if i.tuples[j].Line != i.currEntry.entry.Line {
i.requeue(i.tuples[j].EntryIterator, true)
continue
}
// we count as duplicates only if the tuple is not the one (t) used to fill the current entry
if i.tuples[j] != t {
i.stats.AddDuplicates(1)
}
i.requeue(i.tuples[j].EntryIterator, false)
}
i.tuples = i.tuples[:0]
return true
}
func (i *mergeEntryIterator) Entry() logproto.Entry {
return i.currEntry.entry
}
func (i *mergeEntryIterator) Labels() string {
return i.currEntry.labels
}
func (i *mergeEntryIterator) StreamHash() uint64 { return i.currEntry.streamHash }
func (i *mergeEntryIterator) Error() error {
switch len(i.errs) {
case 0:
return nil
case 1:
return i.errs[0]
default:
return util.MultiError(i.errs)
}
}
func (i *mergeEntryIterator) Close() error {
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
}
}
i.tuples = nil
return nil
}
func (i *mergeEntryIterator) Peek() time.Time {
i.prefetch()
return i.heap.Peek().Entry().Timestamp
}
// Len returns the number of inner iterators on the heap, still having entries
func (i *mergeEntryIterator) Len() int {
i.prefetch()
return i.heap.Len()
}
type entrySortIterator struct {
heap interface {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prefetched bool
currEntry entryWithLabels
errs []error
}
// NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators.
// The iterator only order entries across given `is` iterators, it does not sort entries within individual iterator.
// This means using this iterator with a single iterator will result in the same result as the input iterator.
// When timestamp is equal, the iterator sorts samples by their label alphabetically.
func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) EntryIterator {
if len(is) == 0 {
return NoopIterator
}
if len(is) == 1 {
return is[0]
}
result := &entrySortIterator{is: is}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false, byAlphabetical: true}
case logproto.FORWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true, byAlphabetical: true}
default:
panic("bad direction")
}
return result
}
// init initialize the underlaying heap
func (i *entrySortIterator) init() {
if i.prefetched {
return
}
i.prefetched = true
for _, it := range i.is {
if it.Next() {
i.heap.Push(it)
continue
}
if err := it.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", it.Close)
}
heap.Init(i.heap)
// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
}
func (i *entrySortIterator) Next() bool {
i.init()
if i.heap.Len() == 0 {
return false
}
next := i.heap.Peek()
i.currEntry.entry = next.Entry()
i.currEntry.labels = next.Labels()
i.currEntry.streamHash = next.StreamHash()
// if the top iterator is empty, we remove it.
if !next.Next() {
heap.Pop(i.heap)
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if i.heap.Len() > 1 {
heap.Fix(i.heap, 0)
}
return true
}
func (i *entrySortIterator) Entry() logproto.Entry {
return i.currEntry.entry
}
func (i *entrySortIterator) Labels() string {
return i.currEntry.labels
}
func (i *entrySortIterator) StreamHash() uint64 {
return i.currEntry.streamHash
}
func (i *entrySortIterator) Error() error {
switch len(i.errs) {
case 0:
return nil
case 1:
return i.errs[0]
default:
return util.MultiError(i.errs)
}
}
func (i *entrySortIterator) Close() error {
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
}
}
return nil
}
// NewStreamsIterator returns an iterator over logproto.Stream
func NewStreamsIterator(streams []logproto.Stream, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(streams))
for i := range streams {
is = append(is, NewStreamIterator(streams[i]))
}
return NewSortEntryIterator(is, direction)
}
// NewQueryResponseIterator returns an iterator over a QueryResponse.
func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator {
return NewStreamsIterator(resp.Streams, direction)
}
type queryClientIterator struct {
client logproto.Querier_QueryClient
direction logproto.Direction
err error
curr EntryIterator
}
// NewQueryClientIterator returns an iterator over a QueryClient.
func NewQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator {
return &queryClientIterator{
client: client,
direction: direction,
}
}
func (i *queryClientIterator) Next() bool {
ctx := i.client.Context()
for i.curr == nil || !i.curr.Next() {
batch, err := i.client.Recv()
if err == io.EOF {
return false
} else if err != nil {
i.err = err
return false
}
stats.JoinIngesters(ctx, batch.Stats)
i.curr = NewQueryResponseIterator(batch, i.direction)
}
return true
}
func (i *queryClientIterator) Entry() logproto.Entry {
return i.curr.Entry()
}
func (i *queryClientIterator) Labels() string {
return i.curr.Labels()
}
func (i *queryClientIterator) StreamHash() uint64 { return i.curr.StreamHash() }
func (i *queryClientIterator) Error() error {
return i.err
}
func (i *queryClientIterator) Close() error {
return i.client.CloseSend()
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
type nonOverlappingIterator struct {
iterators []EntryIterator
curr EntryIterator
}
// NewNonOverlappingIterator gives a chained iterator over a list of iterators.
func NewNonOverlappingIterator(iterators []EntryIterator) EntryIterator {
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
return &nonOverlappingIterator{
iterators: iterators,
}
}
func (i *nonOverlappingIterator) Next() bool {
for i.curr == nil || !i.curr.Next() {
if len(i.iterators) == 0 {
if i.curr != nil {
i.curr.Close()
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
return false
}
if i.curr != nil {
i.curr.Close()
}
i.curr, i.iterators = i.iterators[0], i.iterators[1:]
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
return true
}
func (i *nonOverlappingIterator) Entry() logproto.Entry {
return i.curr.Entry()
}
func (i *nonOverlappingIterator) Labels() string {
if i.curr == nil {
return ""
}
return i.curr.Labels()
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
func (i *nonOverlappingIterator) StreamHash() uint64 {
if i.curr == nil {
return 0
}
return i.curr.StreamHash()
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
func (i *nonOverlappingIterator) Error() error {
if i.curr == nil {
return nil
}
return i.curr.Error()
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
func (i *nonOverlappingIterator) Close() error {
if i.curr != nil {
i.curr.Close()
}
for _, iter := range i.iterators {
iter.Close()
}
i.iterators = nil
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
return nil
}
type timeRangedIterator struct {
EntryIterator
mint, maxt time.Time
}
// NewTimeRangedIterator returns an iterator which filters entries by time range.
// Note: Only works with iterators that go forwards.
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator {
return &timeRangedIterator{
EntryIterator: it,
mint: mint,
maxt: maxt,
}
}
func (i *timeRangedIterator) Next() bool {
ok := i.EntryIterator.Next()
if !ok {
i.EntryIterator.Close()
return ok
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
ts := i.EntryIterator.Entry().Timestamp
for ok && i.mint.After(ts) {
ok = i.EntryIterator.Next()
if !ok {
continue
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
ts = i.EntryIterator.Entry().Timestamp
}
if ok {
if ts.Equal(i.mint) { // The mint is inclusive
return true
}
if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive.
ok = false
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
if !ok {
i.EntryIterator.Close()
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
return ok
}
type entryWithLabels struct {
entry logproto.Entry
labels string
streamHash uint64
}
type reverseIterator struct {
iter EntryIterator
cur entryWithLabels
entriesWithLabels []entryWithLabels
loaded bool
limit uint32
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
// NewReversedIter returns an iterator which loads all or up to N entries
// of an existing iterator, and then iterates over them backward.
// Preload entries when they are being queried with a timeout.
func NewReversedIter(it EntryIterator, limit uint32, preload bool) (EntryIterator, error) {
iter, err := &reverseIterator{
iter: it,
entriesWithLabels: make([]entryWithLabels, 0, 1024),
limit: limit,
}, it.Error()
if err != nil {
return nil, err
}
if preload {
iter.load()
}
return iter, nil
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
func (i *reverseIterator) load() {
if !i.loaded {
i.loaded = true
for count := uint32(0); (i.limit == 0 || count < i.limit) && i.iter.Next(); count++ {
i.entriesWithLabels = append(i.entriesWithLabels, entryWithLabels{i.iter.Entry(), i.iter.Labels(), i.iter.StreamHash()})
}
i.iter.Close()
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
func (i *reverseIterator) Next() bool {
i.load()
if len(i.entriesWithLabels) == 0 {
i.entriesWithLabels = nil
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
return false
}
i.cur, i.entriesWithLabels = i.entriesWithLabels[len(i.entriesWithLabels)-1], i.entriesWithLabels[:len(i.entriesWithLabels)-1]
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
return true
}
func (i *reverseIterator) Entry() logproto.Entry {
return i.cur.entry
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
func (i *reverseIterator) Labels() string {
return i.cur.labels
}
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
func (i *reverseIterator) StreamHash() uint64 {
return i.cur.streamHash
}
func (i *reverseIterator) Error() error { return nil }
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
func (i *reverseIterator) Close() error {
if !i.loaded {
return i.iter.Close()
}
return nil
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
}
var entryBufferPool = sync.Pool{
New: func() interface{} {
return &entryBuffer{
LogQL: Labels and Metrics Extraction (#2769) * Adds logfmt, regexp and json logql parser Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook the ast with parsers. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with memchunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with the storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with ingesters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixes all tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactor to pipeline and implement ast parsing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the lexer for duration and range Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes all tests and add some for label filters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add label and line format. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add tests for fmt label and line with validations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Polishing parsers and add some more test cases Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish the unwrap parser, still need to add more tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Indent this hell. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Moar tests and it works. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests which lead me to find a bug in the lexer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests and fix all engine tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes match stage in promtail pipelines. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Hook Pipeline into ingester, tailer and storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Correctly setup sharding for logqlv2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes precedences issue with label filters and add moar tests :v: * Adds quantile_over_time, grouping for non associate range aggregation parsing and moar tests * Extract with grouping * Adds parsing duration on unwrap * Improve the lexer to support more common identifier as functions. Also add duration convertion for unwrap. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the frontend logs to include org_id. The auth middleware was happening after the stats one and so org_id was not set :facepalm:. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Support byte sizes in label filters. This patch extends the duration label filter with support for byte sizes such as `1kB` and `42MiB`. * Wip on error handling. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes json parser with prometheus label name rules. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixup! Support byte sizes in label filters. * Wip error handling, commit before big refactoring. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactoring in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Got something that builds and throw __error__ labels properly now. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add error handling + fixes groupins and post filtering. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * 400 on pipeline errors. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a races in the log pipeline. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Unsure the key is parsable and valid. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Cleanup and code documentation. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes frontend handler. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes old test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix go1.15 local failing test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
5 years ago
entries: make([]entryWithLabels, 0, 1024),
}
},
}
type entryBuffer struct {
LogQL: Labels and Metrics Extraction (#2769) * Adds logfmt, regexp and json logql parser Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook the ast with parsers. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with memchunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with the storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with ingesters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixes all tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactor to pipeline and implement ast parsing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the lexer for duration and range Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes all tests and add some for label filters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add label and line format. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add tests for fmt label and line with validations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Polishing parsers and add some more test cases Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish the unwrap parser, still need to add more tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Indent this hell. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Moar tests and it works. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests which lead me to find a bug in the lexer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests and fix all engine tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes match stage in promtail pipelines. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Hook Pipeline into ingester, tailer and storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Correctly setup sharding for logqlv2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes precedences issue with label filters and add moar tests :v: * Adds quantile_over_time, grouping for non associate range aggregation parsing and moar tests * Extract with grouping * Adds parsing duration on unwrap * Improve the lexer to support more common identifier as functions. Also add duration convertion for unwrap. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the frontend logs to include org_id. The auth middleware was happening after the stats one and so org_id was not set :facepalm:. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Support byte sizes in label filters. This patch extends the duration label filter with support for byte sizes such as `1kB` and `42MiB`. * Wip on error handling. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes json parser with prometheus label name rules. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixup! Support byte sizes in label filters. * Wip error handling, commit before big refactoring. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactoring in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Got something that builds and throw __error__ labels properly now. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add error handling + fixes groupins and post filtering. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * 400 on pipeline errors. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a races in the log pipeline. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Unsure the key is parsable and valid. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Cleanup and code documentation. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes frontend handler. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes old test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix go1.15 local failing test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
5 years ago
entries []entryWithLabels
}
type reverseEntryIterator struct {
iter EntryIterator
LogQL: Labels and Metrics Extraction (#2769) * Adds logfmt, regexp and json logql parser Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook the ast with parsers. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with memchunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with the storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with ingesters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixes all tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactor to pipeline and implement ast parsing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the lexer for duration and range Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes all tests and add some for label filters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add label and line format. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add tests for fmt label and line with validations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Polishing parsers and add some more test cases Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish the unwrap parser, still need to add more tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Indent this hell. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Moar tests and it works. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests which lead me to find a bug in the lexer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests and fix all engine tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes match stage in promtail pipelines. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Hook Pipeline into ingester, tailer and storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Correctly setup sharding for logqlv2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes precedences issue with label filters and add moar tests :v: * Adds quantile_over_time, grouping for non associate range aggregation parsing and moar tests * Extract with grouping * Adds parsing duration on unwrap * Improve the lexer to support more common identifier as functions. Also add duration convertion for unwrap. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the frontend logs to include org_id. The auth middleware was happening after the stats one and so org_id was not set :facepalm:. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Support byte sizes in label filters. This patch extends the duration label filter with support for byte sizes such as `1kB` and `42MiB`. * Wip on error handling. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes json parser with prometheus label name rules. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixup! Support byte sizes in label filters. * Wip error handling, commit before big refactoring. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactoring in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Got something that builds and throw __error__ labels properly now. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add error handling + fixes groupins and post filtering. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * 400 on pipeline errors. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a races in the log pipeline. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Unsure the key is parsable and valid. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Cleanup and code documentation. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes frontend handler. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes old test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix go1.15 local failing test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
5 years ago
cur entryWithLabels
buf *entryBuffer
loaded bool
}
// NewEntryReversedIter returns an iterator which loads all entries and iterates backward.
// The labels of entries is always empty.
func NewEntryReversedIter(it EntryIterator) (EntryIterator, error) {
iter, err := &reverseEntryIterator{
iter: it,
buf: entryBufferPool.Get().(*entryBuffer),
}, it.Error()
if err != nil {
return nil, err
}
return iter, nil
}
func (i *reverseEntryIterator) load() {
if !i.loaded {
i.loaded = true
for i.iter.Next() {
i.buf.entries = append(i.buf.entries, entryWithLabels{i.iter.Entry(), i.iter.Labels(), i.iter.StreamHash()})
}
i.iter.Close()
}
}
func (i *reverseEntryIterator) Next() bool {
i.load()
if i.buf == nil || len(i.buf.entries) == 0 {
i.release()
return false
}
i.cur, i.buf.entries = i.buf.entries[len(i.buf.entries)-1], i.buf.entries[:len(i.buf.entries)-1]
return true
}
func (i *reverseEntryIterator) Entry() logproto.Entry {
LogQL: Labels and Metrics Extraction (#2769) * Adds logfmt, regexp and json logql parser Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook the ast with parsers. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with memchunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with the storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with ingesters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixes all tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactor to pipeline and implement ast parsing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the lexer for duration and range Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes all tests and add some for label filters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add label and line format. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add tests for fmt label and line with validations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Polishing parsers and add some more test cases Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish the unwrap parser, still need to add more tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Indent this hell. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Moar tests and it works. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests which lead me to find a bug in the lexer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests and fix all engine tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes match stage in promtail pipelines. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Hook Pipeline into ingester, tailer and storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Correctly setup sharding for logqlv2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes precedences issue with label filters and add moar tests :v: * Adds quantile_over_time, grouping for non associate range aggregation parsing and moar tests * Extract with grouping * Adds parsing duration on unwrap * Improve the lexer to support more common identifier as functions. Also add duration convertion for unwrap. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the frontend logs to include org_id. The auth middleware was happening after the stats one and so org_id was not set :facepalm:. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Support byte sizes in label filters. This patch extends the duration label filter with support for byte sizes such as `1kB` and `42MiB`. * Wip on error handling. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes json parser with prometheus label name rules. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixup! Support byte sizes in label filters. * Wip error handling, commit before big refactoring. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactoring in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Got something that builds and throw __error__ labels properly now. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add error handling + fixes groupins and post filtering. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * 400 on pipeline errors. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a races in the log pipeline. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Unsure the key is parsable and valid. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Cleanup and code documentation. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes frontend handler. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes old test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix go1.15 local failing test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
5 years ago
return i.cur.entry
}
func (i *reverseEntryIterator) Labels() string {
LogQL: Labels and Metrics Extraction (#2769) * Adds logfmt, regexp and json logql parser Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook the ast with parsers. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with memchunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with the storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * hook parser with ingesters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixes all tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactor to pipeline and implement ast parsing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the lexer for duration and range Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes all tests and add some for label filters Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add label and line format. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add tests for fmt label and line with validations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Polishing parsers and add some more test cases Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish the unwrap parser, still need to add more tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Indent this hell. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Moar tests and it works. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests which lead me to find a bug in the lexer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more tests and fix all engine tests Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes match stage in promtail pipelines. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Hook Pipeline into ingester, tailer and storage. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Correctly setup sharding for logqlv2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes precedences issue with label filters and add moar tests :v: * Adds quantile_over_time, grouping for non associate range aggregation parsing and moar tests * Extract with grouping * Adds parsing duration on unwrap * Improve the lexer to support more common identifier as functions. Also add duration convertion for unwrap. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the frontend logs to include org_id. The auth middleware was happening after the stats one and so org_id was not set :facepalm:. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Support byte sizes in label filters. This patch extends the duration label filter with support for byte sizes such as `1kB` and `42MiB`. * Wip on error handling. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes json parser with prometheus label name rules. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fixup! Support byte sizes in label filters. * Wip error handling, commit before big refactoring. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactoring in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Got something that builds and throw __error__ labels properly now. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add error handling + fixes groupins and post filtering. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * 400 on pipeline errors. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a races in the log pipeline. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Unsure the key is parsable and valid. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Cleanup and code documentation. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes frontend handler. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes old test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix go1.15 local failing test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
5 years ago
return i.cur.labels
}
func (i *reverseEntryIterator) StreamHash() uint64 {
return i.cur.streamHash
}
func (i *reverseEntryIterator) Error() error { return nil }
func (i *reverseEntryIterator) release() {
if i.buf == nil {
return
}
if i.buf.entries != nil {
// preserve the underlying slice before releasing to pool
i.buf.entries = i.buf.entries[:0]
}
entryBufferPool.Put(i.buf)
i.buf = nil
}
func (i *reverseEntryIterator) Close() error {
i.release()
if !i.loaded {
return i.iter.Close()
}
return nil
}
// ReadBatch reads a set of entries off an iterator.
func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
for ; respSize < size && i.Next(); respSize++ {
labels, hash, entry := i.Labels(), i.StreamHash(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
Hash: hash,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
}
result := logproto.QueryResponse{
Streams: make([]logproto.Stream, 0, len(streams)),
}
for _, stream := range streams {
result.Streams = append(result.Streams, *stream)
}
return &result, respSize, i.Error()
}
type peekingEntryIterator struct {
iter EntryIterator
cache *entryWithLabels
next *entryWithLabels
}
// PeekingEntryIterator is an entry iterator that can look ahead an entry
// using `Peek` without advancing its cursor.
type PeekingEntryIterator interface {
EntryIterator
Peek() (string, logproto.Entry, bool)
}
// NewPeekingIterator creates a new peeking iterator.
func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator {
// initialize the next entry so we can peek right from the start.
var cache *entryWithLabels
next := &entryWithLabels{}
if iter.Next() {
cache = &entryWithLabels{
entry: iter.Entry(),
labels: iter.Labels(),
streamHash: iter.StreamHash(),
}
next.entry = cache.entry
next.labels = cache.labels
}
return &peekingEntryIterator{
iter: iter,
cache: cache,
next: next,
}
}
// Next implements `EntryIterator`
func (it *peekingEntryIterator) Next() bool {
if it.cache != nil {
it.next.entry = it.cache.entry
it.next.labels = it.cache.labels
it.next.streamHash = it.cache.streamHash
it.cacheNext()
return true
}
return false
}
// cacheNext caches the next element if it exists.
func (it *peekingEntryIterator) cacheNext() {
if it.iter.Next() {
it.cache.entry = it.iter.Entry()
it.cache.labels = it.iter.Labels()
it.cache.streamHash = it.iter.StreamHash()
return
}
// nothing left removes the cached entry
it.cache = nil
}
// Peek implements `PeekingEntryIterator`
func (it *peekingEntryIterator) Peek() (string, logproto.Entry, bool) {
if it.cache != nil {
return it.cache.labels, it.cache.entry, true
}
return "", logproto.Entry{}, false
}
// Labels implements `EntryIterator`
func (it *peekingEntryIterator) Labels() string {
if it.next != nil {
return it.next.labels
}
return ""
}
func (it *peekingEntryIterator) StreamHash() uint64 {
if it.next != nil {
return it.next.streamHash
}
return 0
}
// Entry implements `EntryIterator`
func (it *peekingEntryIterator) Entry() logproto.Entry {
if it.next != nil {
return it.next.entry
}
return logproto.Entry{}
}
// Error implements `EntryIterator`
func (it *peekingEntryIterator) Error() error {
return it.iter.Error()
}
// Close implements `EntryIterator`
func (it *peekingEntryIterator) Close() error {
return it.iter.Close()
}