Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/logqlmodel/stats/context_test.go

213 lines
5.4 KiB

package stats
import (
"context"
"testing"
"time"
util_log "github.com/cortexproject/cortex/pkg/util/log"
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/require"
)
func TestSnapshot(t *testing.T) {
ctx := NewContext(context.Background())
GetChunkData(ctx).HeadChunkBytes += 10
GetChunkData(ctx).HeadChunkLines += 20
GetChunkData(ctx).DecompressedBytes += 40
GetChunkData(ctx).DecompressedLines += 20
GetChunkData(ctx).CompressedBytes += 30
GetChunkData(ctx).TotalDuplicates += 10
GetStoreData(ctx).TotalChunksRef += 50
GetStoreData(ctx).TotalChunksDownloaded += 60
GetStoreData(ctx).ChunksDownloadTime += time.Second
fakeIngesterQuery(ctx)
fakeIngesterQuery(ctx)
res := Snapshot(ctx, 2*time.Second)
res.Log(util_log.Logger)
expected := Result{
Ingester: Ingester{
TotalChunksMatched: 200,
TotalBatches: 50,
TotalLinesSent: 60,
HeadChunkBytes: 10,
HeadChunkLines: 20,
DecompressedBytes: 24,
DecompressedLines: 40,
CompressedBytes: 60,
TotalDuplicates: 2,
TotalReached: 2,
},
Store: Store{
TotalChunksRef: 50,
TotalChunksDownloaded: 60,
ChunksDownloadTime: time.Second.Seconds(),
HeadChunkBytes: 10,
HeadChunkLines: 20,
DecompressedBytes: 40,
DecompressedLines: 20,
CompressedBytes: 30,
TotalDuplicates: 10,
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
TotalLinesProcessed: int64(100),
},
}
require.Equal(t, expected, res)
}
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
6 years ago
func TestSnapshot_MergesResults(t *testing.T) {
ctx := NewContext(context.Background())
expected := Result{
Ingester: Ingester{
TotalChunksMatched: 200,
TotalBatches: 50,
TotalLinesSent: 60,
HeadChunkBytes: 10,
HeadChunkLines: 20,
DecompressedBytes: 24,
DecompressedLines: 40,
CompressedBytes: 60,
TotalDuplicates: 2,
TotalReached: 2,
},
Store: Store{
TotalChunksRef: 50,
TotalChunksDownloaded: 60,
ChunksDownloadTime: time.Second.Seconds(),
HeadChunkBytes: 10,
HeadChunkLines: 20,
DecompressedBytes: 40,
DecompressedLines: 20,
CompressedBytes: 30,
TotalDuplicates: 10,
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
TotalLinesProcessed: int64(100),
},
}
err := JoinResults(ctx, expected)
require.Nil(t, err)
res := Snapshot(ctx, 2*time.Second)
require.Equal(t, expected, res)
}
func TestGetResult_ErrsNonexistant(t *testing.T) {
out, err := GetResult(context.Background())
require.NotNil(t, err)
require.Nil(t, out)
}
func fakeIngesterQuery(ctx context.Context) {
d, _ := ctx.Value(trailersKey).(*trailerCollector)
meta := d.addTrailer()
c, _ := jsoniter.MarshalToString(ChunkData{
HeadChunkBytes: 5,
HeadChunkLines: 10,
DecompressedBytes: 12,
DecompressedLines: 20,
CompressedBytes: 30,
TotalDuplicates: 1,
})
meta.Set(chunkDataKey, c)
i, _ := jsoniter.MarshalToString(IngesterData{
TotalChunksMatched: 100,
TotalBatches: 25,
TotalLinesSent: 30,
})
meta.Set(ingesterDataKey, i)
}
func TestResult_Merge(t *testing.T) {
var res Result
res.Merge(res) // testing zero.
require.Equal(t, res, res)
toMerge := Result{
Ingester: Ingester{
TotalChunksMatched: 200,
TotalBatches: 50,
TotalLinesSent: 60,
HeadChunkBytes: 10,
HeadChunkLines: 20,
DecompressedBytes: 24,
DecompressedLines: 40,
CompressedBytes: 60,
TotalDuplicates: 2,
TotalReached: 2,
},
Store: Store{
TotalChunksRef: 50,
TotalChunksDownloaded: 60,
ChunksDownloadTime: time.Second.Seconds(),
HeadChunkBytes: 10,
HeadChunkLines: 20,
DecompressedBytes: 40,
DecompressedLines: 20,
CompressedBytes: 30,
TotalDuplicates: 10,
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
TotalLinesProcessed: int64(100),
},
}
res.Merge(toMerge)
require.Equal(t, toMerge, res)
// merge again
res.Merge(toMerge)
require.Equal(t, Result{
Ingester: Ingester{
TotalChunksMatched: 2 * 200,
TotalBatches: 2 * 50,
TotalLinesSent: 2 * 60,
HeadChunkBytes: 2 * 10,
HeadChunkLines: 2 * 20,
DecompressedBytes: 2 * 24,
DecompressedLines: 2 * 40,
CompressedBytes: 2 * 60,
TotalDuplicates: 2 * 2,
TotalReached: 2 * 2,
},
Store: Store{
TotalChunksRef: 2 * 50,
TotalChunksDownloaded: 2 * 60,
ChunksDownloadTime: 2 * time.Second.Seconds(),
HeadChunkBytes: 2 * 10,
HeadChunkLines: 2 * 20,
DecompressedBytes: 2 * 40,
DecompressedLines: 2 * 20,
CompressedBytes: 2 * 30,
TotalDuplicates: 2 * 10,
},
Summary: Summary{
ExecTime: 2 * 2 * time.Second.Seconds(),
BytesProcessedPerSecond: int64(42), // 2 requests at the same pace should give the same bytes/lines per sec
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: 2 * int64(84),
TotalLinesProcessed: 2 * int64(100),
},
}, res)
}