Add duplicates info and remove timing informations. (#1496)

* Add duplicates info and remove timing informations.

Timing information in the chunkenc iterator are too eating too much resources.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/1493/head^2
Cyril Tovena 6 years ago committed by GitHub
parent 616843b7e0
commit 33f70b70ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      pkg/chunkenc/decompression/context.go
  2. 9
      pkg/chunkenc/memchunk.go
  3. 2
      pkg/ingester/instance.go
  4. 36
      pkg/iter/iterator.go
  5. 17
      pkg/iter/iterator_test.go
  6. 4
      pkg/logql/engine.go
  7. 4
      pkg/logql/engine_test.go
  8. 3
      pkg/logql/range_vector_test.go
  9. 2
      pkg/querier/querier.go
  10. 10
      pkg/querier/querier_mock_test.go
  11. 3
      pkg/querier/tail.go
  12. 8
      pkg/storage/iterator.go

@ -11,11 +11,11 @@ const ctxKey ctxKeyType = "decompression"
// Stats is decompression statistic
type Stats struct {
TimeDecompress time.Duration // Time spent decompressing chunks
TimeFiltering time.Duration // Time spent filtering lines
BytesDecompressed int64 // Total bytes decompressed data size
BytesCompressed int64 // Total bytes compressed read
FetchedChunks int64 // Total number of chunks fetched.
TotalDuplicates int64 // Total number of line duplicates from replication.
TimeFetching time.Duration // Time spent fetching chunks.
}
// NewContext creates a new decompression context

@ -560,8 +560,6 @@ func (li *listIterator) Labels() string { return "" }
type bufferedIterator struct {
origBytes []byte
rootCtx context.Context
timeDecompress time.Duration
timeFiltering time.Duration
bytesDecompressed int64
bufReader *bufio.Reader
@ -600,21 +598,16 @@ func (si *bufferedIterator) Next() bool {
}
for {
start := time.Now()
ts, line, ok := si.moveNext()
si.timeDecompress += time.Since(start)
if !ok {
si.Close()
return false
}
// we decode always the line length and ts as varint
si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
start = time.Now()
if si.filter != nil && !si.filter(line) {
si.timeFiltering += time.Since(start)
continue
}
si.timeFiltering += time.Since(start)
si.cur.Line = string(line)
si.cur.Timestamp = time.Unix(0, ts)
return true
@ -690,8 +683,6 @@ func (si *bufferedIterator) Close() error {
func (si *bufferedIterator) close() {
decompression.Mutate(si.rootCtx, func(current *decompression.Stats) {
current.TimeDecompress += si.timeDecompress
current.TimeFiltering += si.timeFiltering
current.BytesDecompressed += si.bytesDecompressed
current.BytesCompressed += int64(len(si.origBytes))
})

@ -209,7 +209,7 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}
iter := iter.NewHeapIterator(iters, req.Direction)
iter := iter.NewHeapIterator(queryServer.Context(), iters, req.Direction)
defer helpers.LogError("closing iterator", iter.Close)
return sendBatches(iter, queryServer, req.Limit)

@ -2,10 +2,12 @@ package iter
import (
"container/heap"
"context"
"fmt"
"io"
"time"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logproto"
)
@ -130,17 +132,19 @@ type heapIterator struct {
}
is []EntryIterator
prefetched bool
ctx context.Context
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
linesDuplicate int64
}
// NewHeapIterator returns a new iterator which uses a heap to merge together
// entries for multiple interators.
func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &heapIterator{is: is}
func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &heapIterator{is: is, ctx: ctx}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{}
@ -233,7 +237,12 @@ func (i *heapIterator) Next() bool {
// Requeue the iterators, advancing them if they were consumed.
for j := range i.tuples {
i.requeue(i.tuples[j].EntryIterator, i.tuples[j].Line != i.currEntry.Line)
if i.tuples[j].Line != i.currEntry.Line {
i.requeue(i.tuples[j].EntryIterator, true)
continue
}
i.linesDuplicate++
i.requeue(i.tuples[j].EntryIterator, false)
}
i.tuples = i.tuples[:0]
return true
@ -302,6 +311,9 @@ func (i *heapIterator) Error() error {
}
func (i *heapIterator) Close() error {
decompression.Mutate(i.ctx, func(m *decompression.Stats) {
m.TotalDuplicates += i.linesDuplicate
})
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
@ -325,21 +337,21 @@ func (i *heapIterator) Len() int {
}
// NewStreamsIterator returns an iterator over logproto.Stream
func NewStreamsIterator(streams []*logproto.Stream, direction logproto.Direction) EntryIterator {
func NewStreamsIterator(ctx context.Context, streams []*logproto.Stream, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(streams))
for i := range streams {
is = append(is, NewStreamIterator(streams[i]))
}
return NewHeapIterator(is, direction)
return NewHeapIterator(ctx, is, direction)
}
// NewQueryResponseIterator returns an iterator over a QueryResponse.
func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator {
func NewQueryResponseIterator(ctx context.Context, resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(resp.Streams))
for i := range resp.Streams {
is = append(is, NewStreamIterator(resp.Streams[i]))
}
return NewHeapIterator(is, direction)
return NewHeapIterator(ctx, is, direction)
}
type queryClientIterator struct {
@ -367,7 +379,7 @@ func (i *queryClientIterator) Next() bool {
return false
}
i.curr = NewQueryResponseIterator(batch, i.direction)
i.curr = NewQueryResponseIterator(i.client.Context(), batch, i.direction)
}
return true

@ -1,6 +1,7 @@
package iter
import (
"context"
"fmt"
"sort"
"testing"
@ -40,7 +41,7 @@ func TestIterator(t *testing.T) {
// Test dedupe of overlapping iterators with the heap iterator.
{
iterator: NewHeapIterator([]EntryIterator{
iterator: NewHeapIterator(context.Background(), []EntryIterator{
mkStreamIterator(offset(0, identity), defaultLabels),
mkStreamIterator(offset(testSize/2, identity), defaultLabels),
mkStreamIterator(offset(testSize, identity), defaultLabels),
@ -52,7 +53,7 @@ func TestIterator(t *testing.T) {
// Test dedupe of overlapping iterators with the heap iterator (backward).
{
iterator: NewHeapIterator([]EntryIterator{
iterator: NewHeapIterator(context.Background(), []EntryIterator{
mkStreamIterator(inverse(offset(0, identity)), defaultLabels),
mkStreamIterator(inverse(offset(-testSize/2, identity)), defaultLabels),
mkStreamIterator(inverse(offset(-testSize, identity)), defaultLabels),
@ -64,7 +65,7 @@ func TestIterator(t *testing.T) {
// Test dedupe of entries with the same timestamp but different entries.
{
iterator: NewHeapIterator([]EntryIterator{
iterator: NewHeapIterator(context.Background(), []EntryIterator{
mkStreamIterator(offset(0, constant(0)), defaultLabels),
mkStreamIterator(offset(0, constant(0)), defaultLabels),
mkStreamIterator(offset(testSize, constant(0)), defaultLabels),
@ -105,7 +106,7 @@ func TestIteratorMultipleLabels(t *testing.T) {
}{
// Test merging with differing labels but same timestamps and values.
{
iterator: NewHeapIterator([]EntryIterator{
iterator: NewHeapIterator(context.Background(), []EntryIterator{
mkStreamIterator(identity, "{foobar: \"baz1\"}"),
mkStreamIterator(identity, "{foobar: \"baz2\"}"),
}, logproto.FORWARD),
@ -123,7 +124,7 @@ func TestIteratorMultipleLabels(t *testing.T) {
// Test merging with differing labels but all the same timestamps and different values.
{
iterator: NewHeapIterator([]EntryIterator{
iterator: NewHeapIterator(context.Background(), []EntryIterator{
mkStreamIterator(constant(0), "{foobar: \"baz1\"}"),
mkStreamIterator(constant(0), "{foobar: \"baz2\"}"),
}, logproto.FORWARD),
@ -177,7 +178,7 @@ func TestHeapIteratorPrefetch(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()
i := NewHeapIterator([]EntryIterator{
i := NewHeapIterator(context.Background(), []EntryIterator{
mkStreamIterator(identity, "{foobar: \"baz1\"}"),
mkStreamIterator(identity, "{foobar: \"baz2\"}"),
}, logproto.FORWARD)
@ -279,7 +280,7 @@ func TestReverseEntryIterator(t *testing.T) {
itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels)
itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}")
heapIterator := NewHeapIterator([]EntryIterator{itr1, itr2}, logproto.BACKWARD)
heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD)
reversedIter, err := NewReversedIter(heapIterator, testSize, false)
require.NoError(t, err)
@ -301,7 +302,7 @@ func TestReverseEntryIteratorUnlimited(t *testing.T) {
itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels)
itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}")
heapIterator := NewHeapIterator([]EntryIterator{itr1, itr2}, logproto.BACKWARD)
heapIterator := NewHeapIterator(context.Background(), []EntryIterator{itr1, itr2}, logproto.BACKWARD)
reversedIter, err := NewReversedIter(heapIterator, 0, false)
require.NoError(t, err)

@ -169,8 +169,8 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) {
defer func() {
stats := decompression.GetStats(ctx)
level.Debug(log).Log(
"Time Decompressing (ms)", stats.TimeDecompress.Nanoseconds()/int64(time.Millisecond),
"Time Filtering (ms)", stats.TimeFiltering.Nanoseconds()/int64(time.Millisecond),
"Time Fetching chunk (ms)", stats.TimeFetching.Nanoseconds()/int64(time.Millisecond),
"Total Duplicates", stats.TotalDuplicates,
"Fetched chunks", stats.FetchedChunks,
"Total bytes compressed (MB)", stats.BytesCompressed/1024/1024,
"Total bytes uncompressed (MB)", stats.BytesDecompressed/1024/1024,

@ -771,7 +771,7 @@ func getLocalQuerier(size int64) Querier {
iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)),
}
return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) {
return iter.NewHeapIterator(iters, p.Direction), nil
return iter.NewHeapIterator(ctx, iters, p.Direction), nil
})
}
@ -799,7 +799,7 @@ func (q *querierRecorder) Select(ctx context.Context, p SelectParams) (iter.Entr
for _, s := range streams {
iters = append(iters, iter.NewStreamIterator(s))
}
return iter.NewHeapIterator(iters, p.Direction), nil
return iter.NewHeapIterator(ctx, iters, p.Direction), nil
}
func paramsID(p SelectParams) string {

@ -1,6 +1,7 @@
package logql
import (
"context"
"fmt"
"testing"
"time"
@ -29,7 +30,7 @@ var labelFoo, _ = promql.ParseMetric("{app=\"foo\"}")
var labelBar, _ = promql.ParseMetric("{app=\"bar\"}")
func newEntryIterator() iter.EntryIterator {
return iter.NewHeapIterator([]iter.EntryIterator{
return iter.NewHeapIterator(context.Background(), []iter.EntryIterator{
iter.NewStreamIterator(&logproto.Stream{
Labels: labelFoo.String(),
Entries: entries,

@ -158,7 +158,7 @@ func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.E
return nil, err
}
iterators := append(ingesterIterators, chunkStoreIterators)
return iter.NewHeapIterator(iterators, params.Direction), nil
return iter.NewHeapIterator(ctx, iterators, params.Direction), nil
}
func (q *Querier) queryIngesters(ctx context.Context, params logql.SelectParams) ([]iter.EntryIterator, error) {

@ -60,6 +60,10 @@ func (c *querierClientMock) Series(ctx context.Context, in *logproto.SeriesReque
return res.(*logproto.SeriesResponse), args.Error(1)
}
func (c *querierClientMock) Context() context.Context {
return context.Background()
}
// newIngesterClientMockFactory creates a factory function always returning
// the input querierClientMock
func newIngesterClientMockFactory(c *querierClientMock) cortex_client.Factory {
@ -118,6 +122,10 @@ func (c *queryClientMock) RecvMsg(m interface{}) error {
return nil
}
func (c *queryClientMock) Context() context.Context {
return context.Background()
}
// tailClientMock is mockable version of Querier_TailClient
type tailClientMock struct {
util.ExtendedMock
@ -290,7 +298,7 @@ func mockStreamIterFromLabelSets(from, quantity int, sets []string) iter.EntryIt
streams = append(streams, mockStreamWithLabels(from, quantity, s))
}
return iter.NewStreamsIterator(streams, logproto.FORWARD)
return iter.NewStreamsIterator(context.Background(), streams, logproto.FORWARD)
}
// mockStream return a stream with quantity entries, where entries timestamp and

@ -1,6 +1,7 @@
package querier
import (
"context"
"sync"
"time"
@ -268,7 +269,7 @@ func newTailer(
waitEntryThrottle time.Duration,
) *Tailer {
t := Tailer{
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{historicEntries}, logproto.FORWARD),
openStreamIterator: iter.NewHeapIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD),
querierTailClients: querierTailClients,
delayFor: delayFor,
responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses),

@ -302,7 +302,7 @@ func newChunksIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, matche
return nil, err
}
return iter.NewHeapIterator(iters, direction), nil
return iter.NewHeapIterator(ctx, iters, direction), nil
}
func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk, filter logql.Filter, direction logproto.Direction, from, through time.Time) ([]iter.EntryIterator, error) {
@ -341,7 +341,7 @@ func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter
result = append(result, iter.NewNonOverlappingIterator(iterators, labels))
}
return iter.NewHeapIterator(result, direction), nil
return iter.NewHeapIterator(ctx, result, direction), nil
}
func filterSeriesByMatchers(chks map[model.Fingerprint][][]*chunkenc.LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]*chunkenc.LazyChunk {
@ -360,6 +360,10 @@ outer:
func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
log, ctx := spanlogger.New(ctx, "LokiStore.fetchLazyChunks")
defer log.Finish()
start := time.Now()
defer decompression.Mutate(ctx, func(m *decompression.Stats) {
m.TimeFetching += time.Since(start)
})
var totalChunks int
chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{}

Loading…
Cancel
Save