mirror of https://github.com/grafana/loki
Improve metric queries by computing samples at the edges. (#2293)
* First pass breaking the code appart. Wondering how we're going to achieve fast mutation of labels. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. I realize I need hash for deduping lines. going to benchmark somes. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tested some hash and decided which one to use. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Wip Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Starting working on ingester. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Trying to find a better hash function. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More hash testing we have a winner. xxhash it is. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Settle on xxhash Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Better params interfacing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add interface for queryparams for things that exist in both type of params. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add storage sample iterator implementations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing tests and verifying we don't get collions for the hashing method. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing ingesters tests and refactoring utility function/tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing and testing that stats are still well computed. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing more tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More engine tests finished. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sharding evaluator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes more engine tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix error tests in the engine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish fixing all tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a bug where extractor was not passed in correctly. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add notes about upgrade. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Renamed and fix a bug. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add memchunk tests and starting test for sampleIterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Test heap sample iterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working on test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finishing testing all new iterators. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Making sure all store functions are tested. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Benchmark and verify everything is working well. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Make the linter happy. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * use xxhash v2. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix a flaky test because of map. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * go.mod. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Edward Welch <edward.welch@grafana.com>pull/2318/head
parent
0b5996021f
commit
0be64fcb34
@ -0,0 +1,73 @@ |
||||
package chunkenc |
||||
|
||||
import ( |
||||
"hash/fnv" |
||||
"hash/maphash" |
||||
"testing" |
||||
|
||||
"github.com/cespare/xxhash/v2" |
||||
"github.com/segmentio/fasthash/fnv1a" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/chunkenc/testdata" |
||||
) |
||||
|
||||
var res uint64 |
||||
|
||||
func Benchmark_fnv64a(b *testing.B) { |
||||
for n := 0; n < b.N; n++ { |
||||
for i := 0; i < len(testdata.LogsBytes); i++ { |
||||
h := fnv.New64a() |
||||
_, _ = h.Write(testdata.LogsBytes[i]) |
||||
res = h.Sum64() |
||||
} |
||||
} |
||||
} |
||||
|
||||
func Benchmark_fnv64a_third_party(b *testing.B) { |
||||
for n := 0; n < b.N; n++ { |
||||
for i := 0; i < len(testdata.LogsBytes); i++ { |
||||
res = fnv1a.HashBytes64(testdata.LogsBytes[i]) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func Benchmark_xxhash(b *testing.B) { |
||||
for n := 0; n < b.N; n++ { |
||||
for i := 0; i < len(testdata.LogsBytes); i++ { |
||||
res = xxhash.Sum64(testdata.LogsBytes[i]) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func Benchmark_hashmap(b *testing.B) { |
||||
// I discarded hashmap/map as it will compute different value on different binary for the same entry
|
||||
var h maphash.Hash |
||||
for n := 0; n < b.N; n++ { |
||||
for i := 0; i < len(testdata.LogsBytes); i++ { |
||||
h.SetSeed(maphash.MakeSeed()) |
||||
_, _ = h.Write(testdata.LogsBytes[i]) |
||||
res = h.Sum64() |
||||
} |
||||
} |
||||
} |
||||
|
||||
func Test_xxhash_integrity(t *testing.T) { |
||||
data := []uint64{} |
||||
|
||||
for i := 0; i < len(testdata.LogsBytes); i++ { |
||||
data = append(data, xxhash.Sum64(testdata.LogsBytes[i])) |
||||
} |
||||
|
||||
for i := 0; i < len(testdata.LogsBytes); i++ { |
||||
require.Equal(t, data[i], xxhash.Sum64(testdata.LogsBytes[i])) |
||||
} |
||||
|
||||
unique := map[uint64]struct{}{} |
||||
for i := 0; i < len(testdata.LogsBytes); i++ { |
||||
_, ok := unique[xxhash.Sum64(testdata.LogsBytes[i])] |
||||
require.False(t, ok, string(testdata.LogsBytes[i])) // all lines have been made unique
|
||||
unique[xxhash.Sum64(testdata.LogsBytes[i])] = struct{}{} |
||||
} |
||||
|
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,512 @@ |
||||
package iter |
||||
|
||||
import ( |
||||
"container/heap" |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
|
||||
"github.com/grafana/loki/pkg/helpers" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logql/stats" |
||||
) |
||||
|
||||
// SampleIterator iterates over samples in time-order.
|
||||
type SampleIterator interface { |
||||
Next() bool |
||||
// todo(ctovena) we should add `Seek(t int64) bool`
|
||||
// This way we can skip when ranging over samples.
|
||||
Sample() logproto.Sample |
||||
Labels() string |
||||
Error() error |
||||
Close() error |
||||
} |
||||
|
||||
// PeekingSampleIterator is a sample iterator that can peek sample without moving the current sample.
|
||||
type PeekingSampleIterator interface { |
||||
SampleIterator |
||||
Peek() (string, logproto.Sample, bool) |
||||
} |
||||
|
||||
type peekingSampleIterator struct { |
||||
iter SampleIterator |
||||
|
||||
cache *sampleWithLabels |
||||
next *sampleWithLabels |
||||
} |
||||
|
||||
type sampleWithLabels struct { |
||||
logproto.Sample |
||||
labels string |
||||
} |
||||
|
||||
func NewPeekingSampleIterator(iter SampleIterator) PeekingSampleIterator { |
||||
// initialize the next entry so we can peek right from the start.
|
||||
var cache *sampleWithLabels |
||||
next := &sampleWithLabels{} |
||||
if iter.Next() { |
||||
cache = &sampleWithLabels{ |
||||
Sample: iter.Sample(), |
||||
labels: iter.Labels(), |
||||
} |
||||
next.Sample = cache.Sample |
||||
next.labels = cache.labels |
||||
} |
||||
return &peekingSampleIterator{ |
||||
iter: iter, |
||||
cache: cache, |
||||
next: next, |
||||
} |
||||
} |
||||
|
||||
func (it *peekingSampleIterator) Close() error { |
||||
return it.iter.Close() |
||||
} |
||||
|
||||
func (it *peekingSampleIterator) Labels() string { |
||||
if it.next != nil { |
||||
return it.next.labels |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (it *peekingSampleIterator) Next() bool { |
||||
if it.cache != nil { |
||||
it.next.Sample = it.cache.Sample |
||||
it.next.labels = it.cache.labels |
||||
it.cacheNext() |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
// cacheNext caches the next element if it exists.
|
||||
func (it *peekingSampleIterator) cacheNext() { |
||||
if it.iter.Next() { |
||||
it.cache.Sample = it.iter.Sample() |
||||
it.cache.labels = it.iter.Labels() |
||||
return |
||||
} |
||||
// nothing left removes the cached entry
|
||||
it.cache = nil |
||||
} |
||||
|
||||
func (it *peekingSampleIterator) Sample() logproto.Sample { |
||||
if it.next != nil { |
||||
return it.next.Sample |
||||
} |
||||
return logproto.Sample{} |
||||
} |
||||
|
||||
func (it *peekingSampleIterator) Peek() (string, logproto.Sample, bool) { |
||||
if it.cache != nil { |
||||
return it.cache.labels, it.cache.Sample, true |
||||
} |
||||
return "", logproto.Sample{}, false |
||||
} |
||||
|
||||
func (it *peekingSampleIterator) Error() error { |
||||
return it.iter.Error() |
||||
} |
||||
|
||||
type sampleIteratorHeap []SampleIterator |
||||
|
||||
func (h sampleIteratorHeap) Len() int { return len(h) } |
||||
func (h sampleIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
||||
func (h sampleIteratorHeap) Peek() SampleIterator { return h[0] } |
||||
func (h *sampleIteratorHeap) Push(x interface{}) { |
||||
*h = append(*h, x.(SampleIterator)) |
||||
} |
||||
|
||||
func (h *sampleIteratorHeap) Pop() interface{} { |
||||
old := *h |
||||
n := len(old) |
||||
x := old[n-1] |
||||
*h = old[0 : n-1] |
||||
return x |
||||
} |
||||
|
||||
func (h sampleIteratorHeap) Less(i, j int) bool { |
||||
s1, s2 := h[i].Sample(), h[j].Sample() |
||||
switch { |
||||
case s1.Timestamp < s2.Timestamp: |
||||
return true |
||||
case s1.Timestamp > s2.Timestamp: |
||||
return false |
||||
default: |
||||
return h[i].Labels() < h[j].Labels() |
||||
} |
||||
} |
||||
|
||||
// heapSampleIterator iterates over a heap of iterators.
|
||||
type heapSampleIterator struct { |
||||
heap *sampleIteratorHeap |
||||
is []SampleIterator |
||||
prefetched bool |
||||
stats *stats.ChunkData |
||||
|
||||
tuples []sampletuple |
||||
curr logproto.Sample |
||||
currLabels string |
||||
errs []error |
||||
} |
||||
|
||||
// NewHeapSampleIterator returns a new iterator which uses a heap to merge together
|
||||
// entries for multiple iterators.
|
||||
func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator { |
||||
|
||||
return &heapSampleIterator{ |
||||
stats: stats.GetChunkData(ctx), |
||||
is: is, |
||||
heap: &sampleIteratorHeap{}, |
||||
tuples: make([]sampletuple, 0, len(is)), |
||||
} |
||||
} |
||||
|
||||
// prefetch iterates over all inner iterators to merge together, calls Next() on
|
||||
// each of them to prefetch the first entry and pushes of them - who are not
|
||||
// empty - to the heap
|
||||
func (i *heapSampleIterator) prefetch() { |
||||
if i.prefetched { |
||||
return |
||||
} |
||||
|
||||
i.prefetched = true |
||||
for _, it := range i.is { |
||||
i.requeue(it, false) |
||||
} |
||||
|
||||
// We can now clear the list of input iterators to merge, given they have all
|
||||
// been processed and the non empty ones have been pushed to the heap
|
||||
i.is = nil |
||||
} |
||||
|
||||
// requeue pushes the input ei EntryIterator to the heap, advancing it via an ei.Next()
|
||||
// call unless the advanced input parameter is true. In this latter case it expects that
|
||||
// the iterator has already been advanced before calling requeue().
|
||||
//
|
||||
// If the iterator has no more entries or an error occur while advancing it, the iterator
|
||||
// is not pushed to the heap and any possible error captured, so that can be get via Error().
|
||||
func (i *heapSampleIterator) requeue(ei SampleIterator, advanced bool) { |
||||
if advanced || ei.Next() { |
||||
heap.Push(i.heap, ei) |
||||
return |
||||
} |
||||
|
||||
if err := ei.Error(); err != nil { |
||||
i.errs = append(i.errs, err) |
||||
} |
||||
helpers.LogError("closing iterator", ei.Close) |
||||
} |
||||
|
||||
type sampletuple struct { |
||||
logproto.Sample |
||||
SampleIterator |
||||
} |
||||
|
||||
func (i *heapSampleIterator) Next() bool { |
||||
i.prefetch() |
||||
|
||||
if i.heap.Len() == 0 { |
||||
return false |
||||
} |
||||
|
||||
// We support multiple entries with the same timestamp, and we want to
|
||||
// preserve their original order. We look at all the top entries in the
|
||||
// heap with the same timestamp, and pop the ones whose common value
|
||||
// occurs most often.
|
||||
for i.heap.Len() > 0 { |
||||
next := i.heap.Peek() |
||||
sample := next.Sample() |
||||
if len(i.tuples) > 0 && (i.tuples[0].Labels() != next.Labels() || i.tuples[0].Timestamp != sample.Timestamp) { |
||||
break |
||||
} |
||||
|
||||
heap.Pop(i.heap) |
||||
i.tuples = append(i.tuples, sampletuple{ |
||||
Sample: sample, |
||||
SampleIterator: next, |
||||
}) |
||||
} |
||||
|
||||
i.curr = i.tuples[0].Sample |
||||
i.currLabels = i.tuples[0].Labels() |
||||
t := i.tuples[0] |
||||
if len(i.tuples) == 1 { |
||||
i.requeue(i.tuples[0].SampleIterator, false) |
||||
i.tuples = i.tuples[:0] |
||||
return true |
||||
} |
||||
// Requeue the iterators, advancing them if they were consumed.
|
||||
for j := range i.tuples { |
||||
if i.tuples[j].Hash != i.curr.Hash { |
||||
i.requeue(i.tuples[j].SampleIterator, true) |
||||
continue |
||||
} |
||||
// we count as duplicates only if the tuple is not the one (t) used to fill the current entry
|
||||
if i.tuples[j] != t { |
||||
i.stats.TotalDuplicates++ |
||||
} |
||||
i.requeue(i.tuples[j].SampleIterator, false) |
||||
} |
||||
i.tuples = i.tuples[:0] |
||||
return true |
||||
} |
||||
|
||||
func (i *heapSampleIterator) Sample() logproto.Sample { |
||||
return i.curr |
||||
} |
||||
|
||||
func (i *heapSampleIterator) Labels() string { |
||||
return i.currLabels |
||||
} |
||||
|
||||
func (i *heapSampleIterator) Error() error { |
||||
switch len(i.errs) { |
||||
case 0: |
||||
return nil |
||||
case 1: |
||||
return i.errs[0] |
||||
default: |
||||
return fmt.Errorf("Multiple errors: %+v", i.errs) |
||||
} |
||||
} |
||||
|
||||
func (i *heapSampleIterator) Close() error { |
||||
for i.heap.Len() > 0 { |
||||
if err := i.heap.Pop().(SampleIterator).Close(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
i.tuples = nil |
||||
return nil |
||||
} |
||||
|
||||
type sampleQueryClientIterator struct { |
||||
client QuerySampleClient |
||||
err error |
||||
curr SampleIterator |
||||
} |
||||
|
||||
// QuerySampleClient is GRPC stream client with only method used by the SampleQueryClientIterator
|
||||
type QuerySampleClient interface { |
||||
Recv() (*logproto.SampleQueryResponse, error) |
||||
Context() context.Context |
||||
CloseSend() error |
||||
} |
||||
|
||||
// NewQueryClientIterator returns an iterator over a QueryClient.
|
||||
func NewSampleQueryClientIterator(client QuerySampleClient) SampleIterator { |
||||
return &sampleQueryClientIterator{ |
||||
client: client, |
||||
} |
||||
} |
||||
|
||||
func (i *sampleQueryClientIterator) Next() bool { |
||||
for i.curr == nil || !i.curr.Next() { |
||||
batch, err := i.client.Recv() |
||||
if err == io.EOF { |
||||
return false |
||||
} else if err != nil { |
||||
i.err = err |
||||
return false |
||||
} |
||||
|
||||
i.curr = NewSampleQueryResponseIterator(i.client.Context(), batch) |
||||
} |
||||
|
||||
return true |
||||
} |
||||
|
||||
func (i *sampleQueryClientIterator) Sample() logproto.Sample { |
||||
return i.curr.Sample() |
||||
} |
||||
|
||||
func (i *sampleQueryClientIterator) Labels() string { |
||||
return i.curr.Labels() |
||||
} |
||||
|
||||
func (i *sampleQueryClientIterator) Error() error { |
||||
return i.err |
||||
} |
||||
|
||||
func (i *sampleQueryClientIterator) Close() error { |
||||
return i.client.CloseSend() |
||||
} |
||||
|
||||
// NewSampleQueryResponseIterator returns an iterator over a SampleQueryResponse.
|
||||
func NewSampleQueryResponseIterator(ctx context.Context, resp *logproto.SampleQueryResponse) SampleIterator { |
||||
return NewMultiSeriesIterator(ctx, resp.Series) |
||||
} |
||||
|
||||
type seriesIterator struct { |
||||
i int |
||||
samples []logproto.Sample |
||||
labels string |
||||
} |
||||
|
||||
// NewMultiSeriesIterator returns an iterator over multiple logproto.Series
|
||||
func NewMultiSeriesIterator(ctx context.Context, series []logproto.Series) SampleIterator { |
||||
is := make([]SampleIterator, 0, len(series)) |
||||
for i := range series { |
||||
is = append(is, NewSeriesIterator(series[i])) |
||||
} |
||||
return NewHeapSampleIterator(ctx, is) |
||||
} |
||||
|
||||
// NewSeriesIterator iterates over sample in a series.
|
||||
func NewSeriesIterator(series logproto.Series) SampleIterator { |
||||
return &seriesIterator{ |
||||
i: -1, |
||||
samples: series.Samples, |
||||
labels: series.Labels, |
||||
} |
||||
} |
||||
|
||||
func (i *seriesIterator) Next() bool { |
||||
i.i++ |
||||
return i.i < len(i.samples) |
||||
} |
||||
|
||||
func (i *seriesIterator) Error() error { |
||||
return nil |
||||
} |
||||
|
||||
func (i *seriesIterator) Labels() string { |
||||
return i.labels |
||||
} |
||||
|
||||
func (i *seriesIterator) Sample() logproto.Sample { |
||||
return i.samples[i.i] |
||||
} |
||||
|
||||
func (i *seriesIterator) Close() error { |
||||
return nil |
||||
} |
||||
|
||||
type nonOverlappingSampleIterator struct { |
||||
labels string |
||||
i int |
||||
iterators []SampleIterator |
||||
curr SampleIterator |
||||
} |
||||
|
||||
// NewNonOverlappingSampleIterator gives a chained iterator over a list of iterators.
|
||||
func NewNonOverlappingSampleIterator(iterators []SampleIterator, labels string) SampleIterator { |
||||
return &nonOverlappingSampleIterator{ |
||||
labels: labels, |
||||
iterators: iterators, |
||||
} |
||||
} |
||||
|
||||
func (i *nonOverlappingSampleIterator) Next() bool { |
||||
for i.curr == nil || !i.curr.Next() { |
||||
if len(i.iterators) == 0 { |
||||
if i.curr != nil { |
||||
i.curr.Close() |
||||
} |
||||
return false |
||||
} |
||||
if i.curr != nil { |
||||
i.curr.Close() |
||||
} |
||||
i.i++ |
||||
i.curr, i.iterators = i.iterators[0], i.iterators[1:] |
||||
} |
||||
|
||||
return true |
||||
} |
||||
|
||||
func (i *nonOverlappingSampleIterator) Sample() logproto.Sample { |
||||
return i.curr.Sample() |
||||
} |
||||
|
||||
func (i *nonOverlappingSampleIterator) Labels() string { |
||||
if i.labels != "" { |
||||
return i.labels |
||||
} |
||||
|
||||
return i.curr.Labels() |
||||
} |
||||
|
||||
func (i *nonOverlappingSampleIterator) Error() error { |
||||
if i.curr != nil { |
||||
return i.curr.Error() |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (i *nonOverlappingSampleIterator) Close() error { |
||||
for _, iter := range i.iterators { |
||||
iter.Close() |
||||
} |
||||
i.iterators = nil |
||||
return nil |
||||
} |
||||
|
||||
type timeRangedSampleIterator struct { |
||||
SampleIterator |
||||
mint, maxt int64 |
||||
} |
||||
|
||||
// NewTimeRangedSampleIterator returns an iterator which filters entries by time range.
|
||||
func NewTimeRangedSampleIterator(it SampleIterator, mint, maxt int64) SampleIterator { |
||||
return &timeRangedSampleIterator{ |
||||
SampleIterator: it, |
||||
mint: mint, |
||||
maxt: maxt, |
||||
} |
||||
} |
||||
|
||||
func (i *timeRangedSampleIterator) Next() bool { |
||||
ok := i.SampleIterator.Next() |
||||
if !ok { |
||||
i.SampleIterator.Close() |
||||
return ok |
||||
} |
||||
ts := i.SampleIterator.Sample().Timestamp |
||||
for ok && i.mint > ts { |
||||
ok = i.SampleIterator.Next() |
||||
if !ok { |
||||
continue |
||||
} |
||||
ts = i.SampleIterator.Sample().Timestamp |
||||
} |
||||
if ok { |
||||
if ts == i.mint { // The mint is inclusive
|
||||
return true |
||||
} |
||||
if i.maxt < ts || i.maxt == ts { // The maxt is exclusive.
|
||||
ok = false |
||||
} |
||||
} |
||||
if !ok { |
||||
i.SampleIterator.Close() |
||||
} |
||||
return ok |
||||
} |
||||
|
||||
// ReadBatch reads a set of entries off an iterator.
|
||||
func ReadSampleBatch(i SampleIterator, size uint32) (*logproto.SampleQueryResponse, uint32, error) { |
||||
series := map[string]*logproto.Series{} |
||||
respSize := uint32(0) |
||||
for ; respSize < size && i.Next(); respSize++ { |
||||
labels, sample := i.Labels(), i.Sample() |
||||
s, ok := series[labels] |
||||
if !ok { |
||||
s = &logproto.Series{ |
||||
Labels: labels, |
||||
} |
||||
series[labels] = s |
||||
} |
||||
s.Samples = append(s.Samples, sample) |
||||
} |
||||
|
||||
result := logproto.SampleQueryResponse{ |
||||
Series: make([]logproto.Series, 0, len(series)), |
||||
} |
||||
for _, s := range series { |
||||
result.Series = append(result.Series, *s) |
||||
} |
||||
return &result, respSize, i.Error() |
||||
} |
||||
@ -0,0 +1,195 @@ |
||||
package iter |
||||
|
||||
import ( |
||||
"context" |
||||
"io" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
) |
||||
|
||||
func TestNewPeekingSampleIterator(t *testing.T) { |
||||
iter := NewPeekingSampleIterator(NewSeriesIterator(logproto.Series{ |
||||
Samples: []logproto.Sample{ |
||||
{ |
||||
Timestamp: time.Unix(0, 1).UnixNano(), |
||||
}, |
||||
{ |
||||
Timestamp: time.Unix(0, 2).UnixNano(), |
||||
}, |
||||
{ |
||||
Timestamp: time.Unix(0, 3).UnixNano(), |
||||
}, |
||||
}, |
||||
})) |
||||
_, peek, ok := iter.Peek() |
||||
if peek.Timestamp != 1 { |
||||
t.Fatal("wrong peeked time.") |
||||
} |
||||
if !ok { |
||||
t.Fatal("should be ok.") |
||||
} |
||||
hasNext := iter.Next() |
||||
if !hasNext { |
||||
t.Fatal("should have next.") |
||||
} |
||||
if iter.Sample().Timestamp != 1 { |
||||
t.Fatal("wrong peeked time.") |
||||
} |
||||
|
||||
_, peek, ok = iter.Peek() |
||||
if peek.Timestamp != 2 { |
||||
t.Fatal("wrong peeked time.") |
||||
} |
||||
if !ok { |
||||
t.Fatal("should be ok.") |
||||
} |
||||
hasNext = iter.Next() |
||||
if !hasNext { |
||||
t.Fatal("should have next.") |
||||
} |
||||
if iter.Sample().Timestamp != 2 { |
||||
t.Fatal("wrong peeked time.") |
||||
} |
||||
_, peek, ok = iter.Peek() |
||||
if peek.Timestamp != 3 { |
||||
t.Fatal("wrong peeked time.") |
||||
} |
||||
if !ok { |
||||
t.Fatal("should be ok.") |
||||
} |
||||
hasNext = iter.Next() |
||||
if !hasNext { |
||||
t.Fatal("should have next.") |
||||
} |
||||
if iter.Sample().Timestamp != 3 { |
||||
t.Fatal("wrong peeked time.") |
||||
} |
||||
_, _, ok = iter.Peek() |
||||
if ok { |
||||
t.Fatal("should not be ok.") |
||||
} |
||||
require.NoError(t, iter.Close()) |
||||
require.NoError(t, iter.Error()) |
||||
} |
||||
|
||||
func sample(i int) logproto.Sample { |
||||
return logproto.Sample{ |
||||
Timestamp: int64(i), |
||||
Hash: uint64(i), |
||||
Value: float64(1), |
||||
} |
||||
} |
||||
|
||||
var varSeries = logproto.Series{ |
||||
Labels: `{foo="var"}`, |
||||
Samples: []logproto.Sample{ |
||||
sample(1), sample(2), sample(3), |
||||
}, |
||||
} |
||||
var carSeries = logproto.Series{ |
||||
Labels: `{foo="car"}`, |
||||
Samples: []logproto.Sample{ |
||||
sample(1), sample(2), sample(3), |
||||
}, |
||||
} |
||||
|
||||
func TestNewHeapSampleIterator(t *testing.T) { |
||||
it := NewHeapSampleIterator(context.Background(), |
||||
[]SampleIterator{ |
||||
NewSeriesIterator(varSeries), |
||||
NewSeriesIterator(carSeries), |
||||
NewSeriesIterator(carSeries), |
||||
NewSeriesIterator(varSeries), |
||||
NewSeriesIterator(carSeries), |
||||
NewSeriesIterator(varSeries), |
||||
NewSeriesIterator(carSeries), |
||||
}) |
||||
|
||||
for i := 1; i < 4; i++ { |
||||
require.True(t, it.Next(), i) |
||||
require.Equal(t, `{foo="car"}`, it.Labels(), i) |
||||
require.Equal(t, sample(i), it.Sample(), i) |
||||
require.True(t, it.Next(), i) |
||||
require.Equal(t, `{foo="var"}`, it.Labels(), i) |
||||
require.Equal(t, sample(i), it.Sample(), i) |
||||
} |
||||
require.False(t, it.Next()) |
||||
require.NoError(t, it.Error()) |
||||
require.NoError(t, it.Close()) |
||||
} |
||||
|
||||
type fakeSampleClient struct { |
||||
series [][]logproto.Series |
||||
curr int |
||||
} |
||||
|
||||
func (f *fakeSampleClient) Recv() (*logproto.SampleQueryResponse, error) { |
||||
if f.curr >= len(f.series) { |
||||
return nil, io.EOF |
||||
} |
||||
res := &logproto.SampleQueryResponse{ |
||||
Series: f.series[f.curr], |
||||
} |
||||
f.curr++ |
||||
return res, nil |
||||
} |
||||
|
||||
func (fakeSampleClient) Context() context.Context { return context.Background() } |
||||
func (fakeSampleClient) CloseSend() error { return nil } |
||||
func TestNewSampleQueryClientIterator(t *testing.T) { |
||||
|
||||
it := NewSampleQueryClientIterator(&fakeSampleClient{ |
||||
series: [][]logproto.Series{ |
||||
{varSeries}, |
||||
{carSeries}, |
||||
}, |
||||
}) |
||||
for i := 1; i < 4; i++ { |
||||
require.True(t, it.Next(), i) |
||||
require.Equal(t, `{foo="var"}`, it.Labels(), i) |
||||
require.Equal(t, sample(i), it.Sample(), i) |
||||
} |
||||
for i := 1; i < 4; i++ { |
||||
require.True(t, it.Next(), i) |
||||
require.Equal(t, `{foo="car"}`, it.Labels(), i) |
||||
require.Equal(t, sample(i), it.Sample(), i) |
||||
} |
||||
require.False(t, it.Next()) |
||||
require.NoError(t, it.Error()) |
||||
require.NoError(t, it.Close()) |
||||
} |
||||
|
||||
func TestNewNonOverlappingSampleIterator(t *testing.T) { |
||||
it := NewNonOverlappingSampleIterator([]SampleIterator{ |
||||
NewSeriesIterator(varSeries), |
||||
NewSeriesIterator(logproto.Series{ |
||||
Labels: varSeries.Labels, |
||||
Samples: []logproto.Sample{sample(4), sample(5)}, |
||||
}), |
||||
}, varSeries.Labels) |
||||
|
||||
for i := 1; i < 6; i++ { |
||||
require.True(t, it.Next(), i) |
||||
require.Equal(t, `{foo="var"}`, it.Labels(), i) |
||||
require.Equal(t, sample(i), it.Sample(), i) |
||||
} |
||||
require.False(t, it.Next()) |
||||
require.NoError(t, it.Error()) |
||||
require.NoError(t, it.Close()) |
||||
} |
||||
|
||||
func TestReadSampleBatch(t *testing.T) { |
||||
res, size, err := ReadSampleBatch(NewSeriesIterator(carSeries), 1) |
||||
require.Equal(t, &logproto.SampleQueryResponse{Series: []logproto.Series{{Labels: carSeries.Labels, Samples: []logproto.Sample{sample(1)}}}}, res) |
||||
require.Equal(t, uint32(1), size) |
||||
require.NoError(t, err) |
||||
|
||||
res, size, err = ReadSampleBatch(NewMultiSeriesIterator(context.Background(), []logproto.Series{carSeries, varSeries}), 100) |
||||
require.ElementsMatch(t, []logproto.Series{carSeries, varSeries}, res.Series) |
||||
require.Equal(t, uint32(6), size) |
||||
require.NoError(t, err) |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,104 +1,24 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/pkg/iter" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
) |
||||
|
||||
var ( |
||||
extractBytes = bytesSampleExtractor{} |
||||
extractCount = countSampleExtractor{} |
||||
ExtractBytes = bytesSampleExtractor{} |
||||
ExtractCount = countSampleExtractor{} |
||||
) |
||||
|
||||
// SeriesIterator is an iterator that iterate over a stream of logs and returns sample.
|
||||
type SeriesIterator interface { |
||||
Close() error |
||||
Next() bool |
||||
Peek() (Sample, bool) |
||||
Error() error |
||||
} |
||||
|
||||
// Sample is a series sample
|
||||
type Sample struct { |
||||
Labels string |
||||
Value float64 |
||||
TimestampNano int64 |
||||
} |
||||
|
||||
type seriesIterator struct { |
||||
iter iter.PeekingEntryIterator |
||||
sampler SampleExtractor |
||||
|
||||
updated bool |
||||
cur Sample |
||||
} |
||||
|
||||
func newSeriesIterator(it iter.EntryIterator, sampler SampleExtractor) SeriesIterator { |
||||
return &seriesIterator{ |
||||
iter: iter.NewPeekingIterator(it), |
||||
sampler: sampler, |
||||
} |
||||
} |
||||
|
||||
func (e *seriesIterator) Close() error { |
||||
return e.iter.Close() |
||||
} |
||||
|
||||
func (e *seriesIterator) Next() bool { |
||||
e.updated = false |
||||
return e.iter.Next() |
||||
} |
||||
|
||||
func (e *seriesIterator) Peek() (Sample, bool) { |
||||
if e.updated { |
||||
return e.cur, true |
||||
} |
||||
|
||||
for { |
||||
lbs, entry, ok := e.iter.Peek() |
||||
if !ok { |
||||
return Sample{}, false |
||||
} |
||||
|
||||
// transform
|
||||
e.cur, ok = e.sampler.From(lbs, entry) |
||||
if ok { |
||||
break |
||||
} |
||||
if !e.iter.Next() { |
||||
return Sample{}, false |
||||
} |
||||
} |
||||
e.updated = true |
||||
return e.cur, true |
||||
} |
||||
|
||||
func (e *seriesIterator) Error() error { |
||||
return e.iter.Error() |
||||
} |
||||
|
||||
// SampleExtractor transforms a log entry into a sample.
|
||||
// In case of failure the second return value will be false.
|
||||
type SampleExtractor interface { |
||||
From(labels string, e logproto.Entry) (Sample, bool) |
||||
Extract(line []byte) (float64, bool) |
||||
} |
||||
|
||||
type countSampleExtractor struct{} |
||||
|
||||
func (countSampleExtractor) From(lbs string, entry logproto.Entry) (Sample, bool) { |
||||
return Sample{ |
||||
Labels: lbs, |
||||
TimestampNano: entry.Timestamp.UnixNano(), |
||||
Value: 1., |
||||
}, true |
||||
func (countSampleExtractor) Extract(line []byte) (float64, bool) { |
||||
return 1., true |
||||
} |
||||
|
||||
type bytesSampleExtractor struct{} |
||||
|
||||
func (bytesSampleExtractor) From(lbs string, entry logproto.Entry) (Sample, bool) { |
||||
return Sample{ |
||||
Labels: lbs, |
||||
TimestampNano: entry.Timestamp.UnixNano(), |
||||
Value: float64(len(entry.Line)), |
||||
}, true |
||||
func (bytesSampleExtractor) Extract(line []byte) (float64, bool) { |
||||
return float64(len(line)), true |
||||
} |
||||
|
||||
@ -1,159 +0,0 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/iter" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
) |
||||
|
||||
func Test_seriesIterator_Peek(t *testing.T) { |
||||
type expectation struct { |
||||
ok bool |
||||
sample Sample |
||||
} |
||||
for _, test := range []struct { |
||||
name string |
||||
it SeriesIterator |
||||
expectations []expectation |
||||
}{ |
||||
{ |
||||
"count", |
||||
newSeriesIterator(iter.NewStreamIterator(newStream(5, identity, `{app="foo"}`)), extractCount), |
||||
[]expectation{ |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 1}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 1}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 1}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(3, 0).UnixNano(), Value: 1}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(4, 0).UnixNano(), Value: 1}}, |
||||
{false, Sample{}}, |
||||
}, |
||||
}, |
||||
{ |
||||
"bytes empty", |
||||
newSeriesIterator( |
||||
iter.NewStreamIterator( |
||||
newStream( |
||||
3, |
||||
func(i int64) logproto.Entry { |
||||
return logproto.Entry{ |
||||
Timestamp: time.Unix(i, 0), |
||||
} |
||||
}, |
||||
`{app="foo"}`, |
||||
), |
||||
), |
||||
extractBytes, |
||||
), |
||||
[]expectation{ |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 0}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 0}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 0}}, |
||||
{false, Sample{}}, |
||||
}, |
||||
}, |
||||
{ |
||||
"bytes", |
||||
newSeriesIterator( |
||||
iter.NewStreamIterator( |
||||
newStream( |
||||
3, |
||||
func(i int64) logproto.Entry { |
||||
return logproto.Entry{ |
||||
Timestamp: time.Unix(i, 0), |
||||
Line: "foo", |
||||
} |
||||
}, |
||||
`{app="foo"}`, |
||||
), |
||||
), |
||||
extractBytes, |
||||
), |
||||
[]expectation{ |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}}, |
||||
{false, Sample{}}, |
||||
}, |
||||
}, |
||||
{ |
||||
"bytes backward", |
||||
newSeriesIterator( |
||||
iter.NewStreamsIterator(context.Background(), |
||||
[]logproto.Stream{ |
||||
newStream( |
||||
3, |
||||
func(i int64) logproto.Entry { |
||||
return logproto.Entry{ |
||||
Timestamp: time.Unix(i, 0), |
||||
Line: "foo", |
||||
} |
||||
}, |
||||
`{app="foo"}`, |
||||
), |
||||
newStream( |
||||
3, |
||||
func(i int64) logproto.Entry { |
||||
return logproto.Entry{ |
||||
Timestamp: time.Unix(i, 0), |
||||
Line: "barr", |
||||
} |
||||
}, |
||||
`{app="barr"}`, |
||||
), |
||||
}, |
||||
logproto.BACKWARD, |
||||
), |
||||
extractBytes, |
||||
), |
||||
[]expectation{ |
||||
{true, Sample{Labels: `{app="barr"}`, TimestampNano: 0, Value: 4}}, |
||||
{true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 4}}, |
||||
{true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 4}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}}, |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}}, |
||||
{false, Sample{}}, |
||||
}, |
||||
}, |
||||
{ |
||||
"skip first", |
||||
newSeriesIterator(iter.NewStreamIterator(newStream(2, identity, `{app="foo"}`)), fakeSampler{}), |
||||
[]expectation{ |
||||
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 10}}, |
||||
{false, Sample{}}, |
||||
}, |
||||
}, |
||||
} { |
||||
t.Run(test.name, func(t *testing.T) { |
||||
for _, e := range test.expectations { |
||||
sample, ok := test.it.Peek() |
||||
require.Equal(t, e.ok, ok) |
||||
if !e.ok { |
||||
continue |
||||
} |
||||
require.Equal(t, e.sample, sample) |
||||
test.it.Next() |
||||
} |
||||
require.NoError(t, test.it.Close()) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
// fakeSampler is a Sampler that returns no value for 0 timestamp otherwise always 10
|
||||
type fakeSampler struct{} |
||||
|
||||
func (fakeSampler) From(lbs string, entry logproto.Entry) (Sample, bool) { |
||||
if entry.Timestamp.UnixNano() == 0 { |
||||
return Sample{}, false |
||||
} |
||||
return Sample{ |
||||
Labels: lbs, |
||||
TimestampNano: entry.Timestamp.UnixNano(), |
||||
Value: 10, |
||||
}, true |
||||
} |
||||
Loading…
Reference in new issue