Improve logql query statistics collection. (#1573)

* Improve logql query statistics collection.
This also add information about ingester queries.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes bad copy/past in the result log.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes ingester tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve headchunk efficiency.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix bad commit on master.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes new interface of ingester grpc server.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve documentations of fields.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/1582/head
Cyril Tovena 5 years ago committed by GitHub
parent 887f0cce7b
commit f1b8d4d8ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      pkg/chunkenc/decompression/context.go
  2. 27
      pkg/chunkenc/memchunk.go
  3. 2
      pkg/chunkenc/memchunk_test.go
  4. 3
      pkg/ingester/ingester_test.go
  5. 55
      pkg/ingester/instance.go
  6. 20
      pkg/iter/iterator.go
  7. 17
      pkg/logql/engine.go
  8. 193
      pkg/logql/stats/context.go
  9. 92
      pkg/logql/stats/context_test.go
  10. 113
      pkg/logql/stats/grpc.go
  11. 110
      pkg/logql/stats/grpc_test.go
  12. 3
      pkg/querier/querier.go
  13. 15
      pkg/storage/iterator.go
  14. 4
      pkg/storage/store.go
  15. 308
      vendor/google.golang.org/grpc/test/bufconn/bufconn.go
  16. 1
      vendor/modules.txt

@ -1,42 +0,0 @@
package decompression
import (
"context"
"time"
)
type ctxKeyType string
const ctxKey ctxKeyType = "decompression"
// Stats is decompression statistic
type Stats struct {
BytesDecompressed int64 // Total bytes decompressed data size
BytesCompressed int64 // Total bytes compressed read
FetchedChunks int64 // Total number of chunks fetched.
TotalDuplicates int64 // Total number of line duplicates from replication.
TimeFetching time.Duration // Time spent fetching chunks.
}
// NewContext creates a new decompression context
func NewContext(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxKey, &Stats{})
}
// GetStats returns decompression statistics from a context.
func GetStats(ctx context.Context) Stats {
d, ok := ctx.Value(ctxKey).(*Stats)
if !ok {
return Stats{}
}
return *d
}
// Mutate mutates the current context statistic using a mutator function
func Mutate(ctx context.Context, mutator func(m *Stats)) {
d, ok := ctx.Value(ctxKey).(*Stats)
if !ok {
return
}
mutator(d)
}

@ -13,10 +13,10 @@ import (
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)
const (
@ -477,7 +477,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}
if !c.head.isEmpty() {
its = append(its, c.head.iterator(mint, maxt, filter))
its = append(its, c.head.iterator(ctx, mint, maxt, filter))
}
iterForward := iter.NewTimeRangedIterator(
@ -500,18 +500,21 @@ func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filte
return newBufferedIterator(ctx, pool, b.b, filter)
}
func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator {
func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logql.Filter) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}
chunkStats := stats.GetChunkData(ctx)
// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.
chunkStats.LinesUncompressed += int64(len(hb.entries))
entries := make([]entry, 0, len(hb.entries))
for _, e := range hb.entries {
chunkStats.BytesUncompressed += int64(len(e.s))
if filter == nil || filter([]byte(e.s)) {
entries = append(entries, e)
}
@ -558,9 +561,8 @@ func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }
type bufferedIterator struct {
origBytes []byte
rootCtx context.Context
bytesDecompressed int64
origBytes []byte
stats *stats.ChunkData
bufReader *bufio.Reader
reader io.Reader
@ -579,8 +581,10 @@ type bufferedIterator struct {
}
func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator {
chunkStats := stats.GetChunkData(ctx)
chunkStats.BytesCompressed += int64(len(b))
return &bufferedIterator{
rootCtx: ctx,
stats: chunkStats,
origBytes: b,
reader: nil, // will be initialized later
bufReader: nil, // will be initialized later
@ -604,7 +608,8 @@ func (si *bufferedIterator) Next() bool {
return false
}
// we decode always the line length and ts as varint
si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
si.stats.BytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
si.stats.LinesDecompressed++
if si.filter != nil && !si.filter(line) {
continue
}
@ -682,10 +687,6 @@ func (si *bufferedIterator) Close() error {
}
func (si *bufferedIterator) close() {
decompression.Mutate(si.rootCtx, func(current *decompression.Stats) {
current.BytesDecompressed += si.bytesDecompressed
current.BytesCompressed += int64(len(si.origBytes))
})
if si.reader != nil {
si.pool.PutReader(si.reader)
si.reader = nil

@ -539,7 +539,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
iter := h.iterator(0, math.MaxInt64, nil)
iter := h.iterator(context.Background(), 0, math.MaxInt64, nil)
for iter.Next() {
_ = iter.Entry()

@ -11,6 +11,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc"
"github.com/cortexproject/cortex/pkg/chunk"
@ -252,6 +253,8 @@ type mockQuerierServer struct {
grpc.ServerStream
}
func (*mockQuerierServer) SetTrailer(metadata.MD){}
func (m *mockQuerierServer) Send(resp *logproto.QueryResponse) error {
m.resps = append(m.resps, resp)
return nil

@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)
@ -186,6 +187,10 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
}
func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries and set grpc trailer with stats.
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
return err
@ -195,12 +200,13 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}
ingStats := stats.GetIngesterData(ctx)
var iters []iter.EntryIterator
err = i.forMatchingStreams(
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter)
ingStats.TotalChunksMatched += int64(len(stream.chunks))
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter)
if err != nil {
return err
}
@ -212,10 +218,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}
iter := iter.NewHeapIterator(queryServer.Context(), iters, req.Direction)
iter := iter.NewHeapIterator(ctx, iters, req.Direction)
defer helpers.LogError("closing iterator", iter.Close)
return sendBatches(iter, queryServer, req.Limit)
return sendBatches(ctx, iter, queryServer, req.Limit)
}
func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
@ -381,10 +387,28 @@ func isDone(ctx context.Context) bool {
}
}
func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
ingStats := stats.GetIngesterData(ctx)
if limit == 0 {
return sendAllBatches(i, queryServer)
// send all batches.
for !isDone(ctx) {
batch, size, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}
if err := queryServer.Send(batch); err != nil {
return err
}
ingStats.TotalLinesSent += int64(size)
ingStats.TotalBatches++
}
return nil
}
// send until the limit is reached.
sent := uint32(0)
for sent < limit && !isDone(queryServer.Context()) {
batch, batchSize, err := iter.ReadBatch(i, helpers.MinUint32(queryBatchSize, limit-sent))
@ -400,23 +424,8 @@ func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer,
if err := queryServer.Send(batch); err != nil {
return err
}
}
return nil
}
func sendAllBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer) error {
for !isDone(queryServer.Context()) {
batch, _, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}
if err := queryServer.Send(batch); err != nil {
return err
}
ingStats.TotalLinesSent += int64(batchSize)
ingStats.TotalBatches++
}
return nil
}

@ -7,9 +7,9 @@ import (
"io"
"time"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
)
// EntryIterator iterates over entries in time-order.
@ -132,19 +132,18 @@ type heapIterator struct {
}
is []EntryIterator
prefetched bool
ctx context.Context
stats *stats.ChunkData
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
linesDuplicate int64
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
}
// NewHeapIterator returns a new iterator which uses a heap to merge together
// entries for multiple interators.
func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &heapIterator{is: is, ctx: ctx}
result := &heapIterator{is: is, stats: stats.GetChunkData(ctx)}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{}
@ -241,7 +240,7 @@ func (i *heapIterator) Next() bool {
i.requeue(i.tuples[j].EntryIterator, true)
continue
}
i.linesDuplicate++
i.stats.TotalDuplicates++
i.requeue(i.tuples[j].EntryIterator, false)
}
i.tuples = i.tuples[:0]
@ -311,9 +310,6 @@ func (i *heapIterator) Error() error {
}
func (i *heapIterator) Close() error {
decompression.Mutate(i.ctx, func(m *decompression.Stats) {
m.TotalDuplicates += i.linesDuplicate
})
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err

@ -8,11 +8,10 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -164,19 +163,13 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) {
return nil, err
}
ctx = decompression.NewContext(ctx)
ctx = stats.NewContext(ctx)
start := time.Now()
defer func() {
stats := decompression.GetStats(ctx)
level.Debug(log).Log(
"Time Fetching chunk (ms)", stats.TimeFetching.Nanoseconds()/int64(time.Millisecond),
"Total Duplicates", stats.TotalDuplicates,
"Fetched chunks", stats.FetchedChunks,
"Total bytes compressed (MB)", stats.BytesCompressed/1024/1024,
"Total bytes uncompressed (MB)", stats.BytesDecompressed/1024/1024,
"Total exec time (ms)", time.Since(start).Nanoseconds()/int64(time.Millisecond),
)
resultStats := stats.Snapshot(ctx, time.Since(start))
stats.Log(log, resultStats)
}()
switch e := expr.(type) {
case SampleExpr:
if err := ng.setupIterators(ctx, e, q); err != nil {

@ -0,0 +1,193 @@
/*
Package stats provides primitives for recording metrics across the query path.
Statistics are passed through the query context.
To start a new query statistics context use:
ctx := stats.NewContext(ctx)
Then you can update statistics by mutating data by using:
stats.GetChunkData(ctx)
stats.GetIngesterData(ctx)
stats.GetStoreData
Finally to get a snapshot of the current query statistic use
stats.Snapshot(ctx,time.Since(start))
Ingester statistics are sent across the GRPC stream using Trailers
see https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md
*/
package stats
import (
"context"
"time"
"github.com/dustin/go-humanize"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
type ctxKeyType string
const (
trailersKey ctxKeyType = "trailers"
chunksKey ctxKeyType = "chunks"
ingesterKey ctxKeyType = "ingester"
storeKey ctxKeyType = "store"
)
// Result contains LogQL query statistics.
type Result struct {
Ingester Ingester
Store Store
Summary Summary
}
// Log logs a query statistics result.
func Log(log log.Logger, r Result) {
level.Debug(log).Log(
"Ingester.TotalReached", r.Ingester.TotalReached,
"Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched,
"Ingester.TotalBatches", r.Ingester.TotalBatches,
"Ingester.TotalLinesSent", r.Ingester.TotalLinesSent,
"Ingester.BytesUncompressed", humanize.Bytes(uint64(r.Ingester.BytesUncompressed)),
"Ingester.LinesUncompressed", r.Ingester.LinesUncompressed,
"Ingester.BytesDecompressed", humanize.Bytes(uint64(r.Ingester.BytesDecompressed)),
"Ingester.LinesDecompressed", r.Ingester.LinesDecompressed,
"Ingester.BytesCompressed", humanize.Bytes(uint64(r.Ingester.BytesCompressed)),
"Ingester.TotalDuplicates", r.Ingester.TotalDuplicates,
"Store.TotalChunksRef", r.Store.TotalChunksRef,
"Store.TotalDownloadedChunks", r.Store.TotalDownloadedChunks,
"Store.TimeDownloadingChunks", r.Store.TimeDownloadingChunks,
"Store.BytesUncompressed", humanize.Bytes(uint64(r.Store.BytesUncompressed)),
"Store.LinesUncompressed", r.Store.LinesUncompressed,
"Store.BytesDecompressed", humanize.Bytes(uint64(r.Store.BytesDecompressed)),
"Store.LinesDecompressed", r.Store.LinesDecompressed,
"Store.BytesCompressed", humanize.Bytes(uint64(r.Store.BytesCompressed)),
"Store.TotalDuplicates", r.Store.TotalDuplicates,
"Summary.BytesProcessedPerSeconds", humanize.Bytes(uint64(r.Summary.BytesProcessedPerSeconds)),
"Summary.LinesProcessedPerSeconds", r.Summary.LinesProcessedPerSeconds,
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(r.Summary.TotalBytesProcessed)),
"Summary.TotalLinesProcessed", r.Summary.TotalLinesProcessed,
"Summary.ExecTime", r.Summary.ExecTime,
)
}
// Summary is the summary of a query statistics.
type Summary struct {
BytesProcessedPerSeconds int64 // Total bytes processed per seconds.
LinesProcessedPerSeconds int64 // Total lines processed per seconds.
TotalBytesProcessed int64 // Total bytes processed.
TotalLinesProcessed int64 // Total lines processed.
ExecTime time.Duration // Execution time.
}
// Ingester is the statistics result for ingesters queries.
type Ingester struct {
IngesterData
ChunkData
TotalReached int
}
// Store is the statistics result of the store.
type Store struct {
StoreData
ChunkData
}
// NewContext creates a new statistics context
func NewContext(ctx context.Context) context.Context {
ctx = injectTrailerCollector(ctx)
ctx = context.WithValue(ctx, storeKey, &StoreData{})
ctx = context.WithValue(ctx, chunksKey, &ChunkData{})
ctx = context.WithValue(ctx, ingesterKey, &IngesterData{})
return ctx
}
// ChunkData contains chunks specific statistics.
type ChunkData struct {
BytesUncompressed int64 // Total bytes processed but was already in memory. (found in the headchunk)
LinesUncompressed int64 // Total lines processed but was already in memory. (found in the headchunk)
BytesDecompressed int64 // Total bytes decompressed and processed from chunks.
LinesDecompressed int64 // Total lines decompressed and processed from chunks.
BytesCompressed int64 // Total bytes of compressed chunks (blocks) processed.
TotalDuplicates int64 // Total duplicates found while processing.
}
// GetChunkData returns the chunks statistics data from the current context.
func GetChunkData(ctx context.Context) *ChunkData {
res, ok := ctx.Value(chunksKey).(*ChunkData)
if !ok {
return &ChunkData{}
}
return res
}
// IngesterData contains ingester specific statistics.
type IngesterData struct {
TotalChunksMatched int64 // Total of chunks matched by the query from ingesters
TotalBatches int64 // Total of batches sent from ingesters.
TotalLinesSent int64 // Total lines sent by ingesters.
}
// GetIngesterData returns the ingester statistics data from the current context.
func GetIngesterData(ctx context.Context) *IngesterData {
res, ok := ctx.Value(ingesterKey).(*IngesterData)
if !ok {
return &IngesterData{}
}
return res
}
// StoreData contains store specific statistics.
type StoreData struct {
TotalChunksRef int64 // The total of chunk reference fetched from index.
TotalDownloadedChunks int64 // Total number of chunks fetched.
TimeDownloadingChunks time.Duration // Time spent fetching chunks.
}
// GetStoreData returns the store statistics data from the current context.
func GetStoreData(ctx context.Context) *StoreData {
res, ok := ctx.Value(storeKey).(*StoreData)
if !ok {
return &StoreData{}
}
return res
}
// Snapshot compute query statistics from a context using the total exec time.
func Snapshot(ctx context.Context, execTime time.Duration) Result {
var res Result
// ingester data is decoded from grpc trailers.
res.Ingester = decodeTrailers(ctx)
// collect data from store.
s, ok := ctx.Value(storeKey).(*StoreData)
if ok {
res.Store.StoreData = *s
}
// collect data from chunks iteration.
c, ok := ctx.Value(chunksKey).(*ChunkData)
if ok {
res.Store.ChunkData = *c
}
// calculate the summary
res.Summary.TotalBytesProcessed = res.Store.BytesDecompressed + res.Store.BytesUncompressed +
res.Ingester.BytesDecompressed + res.Ingester.BytesUncompressed
res.Summary.BytesProcessedPerSeconds =
int64(float64(res.Summary.TotalBytesProcessed) /
execTime.Seconds())
res.Summary.TotalLinesProcessed = res.Store.LinesDecompressed + res.Store.LinesUncompressed +
res.Ingester.LinesDecompressed + res.Ingester.LinesUncompressed
res.Summary.LinesProcessedPerSeconds =
int64(float64(res.Summary.TotalLinesProcessed) /
execTime.Seconds())
res.Summary.ExecTime = execTime
return res
}

@ -0,0 +1,92 @@
package stats
import (
"context"
"testing"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/require"
)
func TestSnapshot(t *testing.T) {
ctx := NewContext(context.Background())
GetChunkData(ctx).BytesUncompressed += 10
GetChunkData(ctx).LinesUncompressed += 20
GetChunkData(ctx).BytesDecompressed += 40
GetChunkData(ctx).LinesDecompressed += 20
GetChunkData(ctx).BytesCompressed += 30
GetChunkData(ctx).TotalDuplicates += 10
GetStoreData(ctx).TotalChunksRef += 50
GetStoreData(ctx).TotalDownloadedChunks += 60
GetStoreData(ctx).TimeDownloadingChunks += time.Second
fakeIngesterQuery(ctx)
fakeIngesterQuery(ctx)
res := Snapshot(ctx, 2*time.Second)
expected := Result{
Ingester: Ingester{
IngesterData: IngesterData{
TotalChunksMatched: 200,
TotalBatches: 50,
TotalLinesSent: 60,
},
ChunkData: ChunkData{
BytesUncompressed: 10,
LinesUncompressed: 20,
BytesDecompressed: 24,
LinesDecompressed: 40,
BytesCompressed: 60,
TotalDuplicates: 2,
},
TotalReached: 2,
},
Store: Store{
StoreData: StoreData{
TotalChunksRef: 50,
TotalDownloadedChunks: 60,
TimeDownloadingChunks: time.Second,
},
ChunkData: ChunkData{
BytesUncompressed: 10,
LinesUncompressed: 20,
BytesDecompressed: 40,
LinesDecompressed: 20,
BytesCompressed: 30,
TotalDuplicates: 10,
},
},
Summary: Summary{
ExecTime: 2 * time.Second,
BytesProcessedPerSeconds: int64(42),
LinesProcessedPerSeconds: int64(50),
TotalBytesProcessed: int64(84),
TotalLinesProcessed: int64(100),
},
}
require.Equal(t, expected, res)
}
func fakeIngesterQuery(ctx context.Context) {
d, _ := ctx.Value(trailersKey).(*trailerCollector)
meta := d.addTrailer()
c, _ := jsoniter.MarshalToString(ChunkData{
BytesUncompressed: 5,
LinesUncompressed: 10,
BytesDecompressed: 12,
LinesDecompressed: 20,
BytesCompressed: 30,
TotalDuplicates: 1,
})
meta.Set(chunkDataKey, c)
i, _ := jsoniter.MarshalToString(IngesterData{
TotalChunksMatched: 100,
TotalBatches: 25,
TotalLinesSent: 30,
})
meta.Set(ingesterDataKey, i)
}

@ -0,0 +1,113 @@
package stats
import (
"context"
"sync"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
jsoniter "github.com/json-iterator/go"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
const (
ingesterDataKey = "ingester_data"
chunkDataKey = "chunk_data"
)
type trailerCollector struct {
trailers []*metadata.MD
sync.Mutex
}
func (c *trailerCollector) addTrailer() *metadata.MD {
c.Lock()
defer c.Unlock()
meta := metadata.MD{}
c.trailers = append(c.trailers, &meta)
return &meta
}
func injectTrailerCollector(ctx context.Context) context.Context {
return context.WithValue(ctx, trailersKey, &trailerCollector{})
}
// CollectTrailer register a new trailer that can be collected by the engine.
func CollectTrailer(ctx context.Context) grpc.CallOption {
d, ok := ctx.Value(trailersKey).(*trailerCollector)
if !ok {
return grpc.EmptyCallOption{}
}
return grpc.Trailer(d.addTrailer())
}
func SendAsTrailer(ctx context.Context, stream grpc.ServerStream) {
trailer, err := encodeTrailer(ctx)
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to encode trailer", "err", err)
return
}
stream.SetTrailer(trailer)
}
func encodeTrailer(ctx context.Context) (metadata.MD, error) {
meta := metadata.MD{}
ingData, ok := ctx.Value(ingesterKey).(*IngesterData)
if ok {
data, err := jsoniter.MarshalToString(ingData)
if err != nil {
return meta, err
}
meta.Set(ingesterDataKey, data)
}
chunkData, ok := ctx.Value(chunksKey).(*ChunkData)
if ok {
data, err := jsoniter.MarshalToString(chunkData)
if err != nil {
return meta, err
}
meta.Set(chunkDataKey, data)
}
return meta, nil
}
func decodeTrailers(ctx context.Context) Ingester {
var res Ingester
collector, ok := ctx.Value(trailersKey).(*trailerCollector)
if !ok {
return res
}
res.TotalReached = len(collector.trailers)
for _, meta := range collector.trailers {
ing := decodeTrailer(meta)
res.TotalChunksMatched += ing.TotalChunksMatched
res.TotalBatches += ing.TotalBatches
res.TotalLinesSent += ing.TotalLinesSent
res.BytesUncompressed += ing.BytesUncompressed
res.LinesUncompressed += ing.LinesUncompressed
res.BytesDecompressed += ing.BytesDecompressed
res.LinesDecompressed += ing.LinesDecompressed
res.BytesCompressed += ing.BytesCompressed
res.TotalDuplicates += ing.TotalDuplicates
}
return res
}
func decodeTrailer(meta *metadata.MD) Ingester {
var res Ingester
values := meta.Get(ingesterDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &res.IngesterData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal ingester data", "err", err)
}
}
values = meta.Get(chunkDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &res.ChunkData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
return res
}

@ -0,0 +1,110 @@
package stats
import (
"context"
"io"
"log"
"net"
"testing"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
)
const bufSize = 1024 * 1024
var lis *bufconn.Listener
var server *grpc.Server
func init() {
lis = bufconn.Listen(bufSize)
server = grpc.NewServer()
}
func bufDialer(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
func TestCollectTrailer(t *testing.T) {
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()
ing := ingesterFn(func(req *logproto.QueryRequest, s logproto.Querier_QueryServer) error {
ingCtx := NewContext(s.Context())
defer SendAsTrailer(ingCtx, s)
GetIngesterData(ingCtx).TotalChunksMatched++
GetIngesterData(ingCtx).TotalBatches = +2
GetIngesterData(ingCtx).TotalLinesSent = +3
GetChunkData(ingCtx).BytesUncompressed++
GetChunkData(ingCtx).LinesUncompressed++
GetChunkData(ingCtx).BytesDecompressed++
GetChunkData(ingCtx).LinesDecompressed++
GetChunkData(ingCtx).BytesCompressed++
GetChunkData(ingCtx).TotalDuplicates++
return nil
})
logproto.RegisterQuerierServer(server, ing)
go func() {
if err := server.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()
ingClient := logproto.NewQuerierClient(conn)
ctx = NewContext(ctx)
// query the ingester twice.
clientStream, err := ingClient.Query(ctx, &logproto.QueryRequest{}, CollectTrailer(ctx))
if err != nil {
t.Fatal(err)
}
_, err = clientStream.Recv()
if err != nil && err != io.EOF {
t.Fatal(err)
}
clientStream, err = ingClient.Query(ctx, &logproto.QueryRequest{}, CollectTrailer(ctx))
if err != nil {
t.Fatal(err)
}
_, err = clientStream.Recv()
if err != nil && err != io.EOF {
t.Fatal(err)
}
err = clientStream.CloseSend()
if err != nil {
t.Fatal(err)
}
res := decodeTrailers(ctx)
require.Equal(t, 2, res.TotalReached)
require.Equal(t, int64(2), res.TotalChunksMatched)
require.Equal(t, int64(4), res.TotalBatches)
require.Equal(t, int64(6), res.TotalLinesSent)
require.Equal(t, int64(2), res.BytesUncompressed)
require.Equal(t, int64(2), res.LinesUncompressed)
require.Equal(t, int64(2), res.BytesDecompressed)
require.Equal(t, int64(2), res.LinesDecompressed)
require.Equal(t, int64(2), res.BytesCompressed)
require.Equal(t, int64(2), res.TotalDuplicates)
}
type ingesterFn func(*logproto.QueryRequest, logproto.Querier_QueryServer) error
func (i ingesterFn) Query(req *logproto.QueryRequest, s logproto.Querier_QueryServer) error {
return i(req, s)
}
func (ingesterFn) Label(context.Context, *logproto.LabelRequest) (*logproto.LabelResponse, error) {
return nil, nil
}
func (ingesterFn) Tail(*logproto.TailRequest, logproto.Querier_TailServer) error { return nil }
func (ingesterFn) Series(context.Context, *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
return nil, nil
}
func (ingesterFn) TailersCount(context.Context, *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) {
return nil, nil
}

@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/validation"
)
@ -172,7 +173,7 @@ func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.E
func (q *Querier) queryIngesters(ctx context.Context, params logql.SelectParams) ([]iter.EntryIterator, error) {
clients, err := q.forAllIngesters(ctx, func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(ctx, params.QueryRequest)
return client.Query(ctx, params.QueryRequest, stats.CollectTrailer(ctx))
})
if err != nil {
return nil, err

@ -13,10 +13,10 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)
// lazyChunks is a slice of lazy chunks that can ordered by chunk boundaries
@ -366,11 +366,13 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
log, ctx := spanlogger.New(ctx, "LokiStore.fetchLazyChunks")
defer log.Finish()
start := time.Now()
defer decompression.Mutate(ctx, func(m *decompression.Stats) {
m.TimeFetching += time.Since(start)
})
storeStats := stats.GetStoreData(ctx)
var totalChunks int64
defer func(){
storeStats.TimeDownloadingChunks += time.Since(start)
storeStats.TotalDownloadedChunks += totalChunks
}()
var totalChunks int
chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{}
for _, c := range chunks {
if c.Chunk.Data == nil {
@ -419,9 +421,6 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
lastErr = err
}
}
decompression.Mutate(ctx, func(m *decompression.Stats) {
m.FetchedChunks += int64(totalChunks)
})
return lastErr
}

@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)
@ -55,6 +56,8 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront
// for that request.
func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) {
storeStats := stats.GetStoreData(ctx)
expr, err := req.LogSelector()
if err != nil {
return nil, err
@ -85,6 +88,7 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent
var totalChunks int
for i := range chks {
storeStats.TotalChunksRef += int64(len(chks[i]))
chks[i] = filterChunksByTime(from, through, chks[i])
totalChunks += len(chks[i])
}

@ -0,0 +1,308 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package bufconn provides a net.Conn implemented by a buffer and related
// dialing and listening functionality.
package bufconn
import (
"fmt"
"io"
"net"
"sync"
"time"
)
// Listener implements a net.Listener that creates local, buffered net.Conns
// via its Accept and Dial method.
type Listener struct {
mu sync.Mutex
sz int
ch chan net.Conn
done chan struct{}
}
// Implementation of net.Error providing timeout
type netErrorTimeout struct {
error
}
func (e netErrorTimeout) Timeout() bool { return true }
func (e netErrorTimeout) Temporary() bool { return false }
var errClosed = fmt.Errorf("closed")
var errTimeout net.Error = netErrorTimeout{error: fmt.Errorf("i/o timeout")}
// Listen returns a Listener that can only be contacted by its own Dialers and
// creates buffered connections between the two.
func Listen(sz int) *Listener {
return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})}
}
// Accept blocks until Dial is called, then returns a net.Conn for the server
// half of the connection.
func (l *Listener) Accept() (net.Conn, error) {
select {
case <-l.done:
return nil, errClosed
case c := <-l.ch:
return c, nil
}
}
// Close stops the listener.
func (l *Listener) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
select {
case <-l.done:
// Already closed.
break
default:
close(l.done)
}
return nil
}
// Addr reports the address of the listener.
func (l *Listener) Addr() net.Addr { return addr{} }
// Dial creates an in-memory full-duplex network connection, unblocks Accept by
// providing it the server half of the connection, and returns the client half
// of the connection.
func (l *Listener) Dial() (net.Conn, error) {
p1, p2 := newPipe(l.sz), newPipe(l.sz)
select {
case <-l.done:
return nil, errClosed
case l.ch <- &conn{p1, p2}:
return &conn{p2, p1}, nil
}
}
type pipe struct {
mu sync.Mutex
// buf contains the data in the pipe. It is a ring buffer of fixed capacity,
// with r and w pointing to the offset to read and write, respsectively.
//
// Data is read between [r, w) and written to [w, r), wrapping around the end
// of the slice if necessary.
//
// The buffer is empty if r == len(buf), otherwise if r == w, it is full.
//
// w and r are always in the range [0, cap(buf)) and [0, len(buf)].
buf []byte
w, r int
wwait sync.Cond
rwait sync.Cond
// Indicate that a write/read timeout has occurred
wtimedout bool
rtimedout bool
wtimer *time.Timer
rtimer *time.Timer
closed bool
writeClosed bool
}
func newPipe(sz int) *pipe {
p := &pipe{buf: make([]byte, 0, sz)}
p.wwait.L = &p.mu
p.rwait.L = &p.mu
p.wtimer = time.AfterFunc(0, func() {})
p.rtimer = time.AfterFunc(0, func() {})
return p
}
func (p *pipe) empty() bool {
return p.r == len(p.buf)
}
func (p *pipe) full() bool {
return p.r < len(p.buf) && p.r == p.w
}
func (p *pipe) Read(b []byte) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
// Block until p has data.
for {
if p.closed {
return 0, io.ErrClosedPipe
}
if !p.empty() {
break
}
if p.writeClosed {
return 0, io.EOF
}
if p.rtimedout {
return 0, errTimeout
}
p.rwait.Wait()
}
wasFull := p.full()
n = copy(b, p.buf[p.r:len(p.buf)])
p.r += n
if p.r == cap(p.buf) {
p.r = 0
p.buf = p.buf[:p.w]
}
// Signal a blocked writer, if any
if wasFull {
p.wwait.Signal()
}
return n, nil
}
func (p *pipe) Write(b []byte) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return 0, io.ErrClosedPipe
}
for len(b) > 0 {
// Block until p is not full.
for {
if p.closed || p.writeClosed {
return 0, io.ErrClosedPipe
}
if !p.full() {
break
}
if p.wtimedout {
return 0, errTimeout
}
p.wwait.Wait()
}
wasEmpty := p.empty()
end := cap(p.buf)
if p.w < p.r {
end = p.r
}
x := copy(p.buf[p.w:end], b)
b = b[x:]
n += x
p.w += x
if p.w > len(p.buf) {
p.buf = p.buf[:p.w]
}
if p.w == cap(p.buf) {
p.w = 0
}
// Signal a blocked reader, if any.
if wasEmpty {
p.rwait.Signal()
}
}
return n, nil
}
func (p *pipe) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
p.closed = true
// Signal all blocked readers and writers to return an error.
p.rwait.Broadcast()
p.wwait.Broadcast()
return nil
}
func (p *pipe) closeWrite() error {
p.mu.Lock()
defer p.mu.Unlock()
p.writeClosed = true
// Signal all blocked readers and writers to return an error.
p.rwait.Broadcast()
p.wwait.Broadcast()
return nil
}
type conn struct {
io.Reader
io.Writer
}
func (c *conn) Close() error {
err1 := c.Reader.(*pipe).Close()
err2 := c.Writer.(*pipe).closeWrite()
if err1 != nil {
return err1
}
return err2
}
func (c *conn) SetDeadline(t time.Time) error {
c.SetReadDeadline(t)
c.SetWriteDeadline(t)
return nil
}
func (c *conn) SetReadDeadline(t time.Time) error {
p := c.Reader.(*pipe)
p.mu.Lock()
defer p.mu.Unlock()
p.rtimer.Stop()
p.rtimedout = false
if !t.IsZero() {
p.rtimer = time.AfterFunc(time.Until(t), func() {
p.mu.Lock()
defer p.mu.Unlock()
p.rtimedout = true
p.rwait.Broadcast()
})
}
return nil
}
func (c *conn) SetWriteDeadline(t time.Time) error {
p := c.Writer.(*pipe)
p.mu.Lock()
defer p.mu.Unlock()
p.wtimer.Stop()
p.wtimedout = false
if !t.IsZero() {
p.wtimer = time.AfterFunc(time.Until(t), func() {
p.mu.Lock()
defer p.mu.Unlock()
p.wtimedout = true
p.wwait.Broadcast()
})
}
return nil
}
func (*conn) LocalAddr() net.Addr { return addr{} }
func (*conn) RemoteAddr() net.Addr { return addr{} }
type addr struct{}
func (addr) Network() string { return "bufconn" }
func (addr) String() string { return "bufconn" }

@ -830,6 +830,7 @@ google.golang.org/grpc/serviceconfig
google.golang.org/grpc/stats
google.golang.org/grpc/status
google.golang.org/grpc/tap
google.golang.org/grpc/test/bufconn
# gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/alecthomas/kingpin.v2
# gopkg.in/fsnotify.v1 v1.4.7

Loading…
Cancel
Save