diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 27cb3e849f..b7d37390d1 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -235,6 +235,12 @@ type DownstreamQuery struct { Params Params } +type Resp struct { + I int + Res logqlmodel.Result + Err error +} + // Downstreamer is an interface for deferring responsibility for query execution. // It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs. type Downstreamer interface { @@ -375,24 +381,18 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( results, err := ev.Downstream(ctx, queries) if err != nil { - return nil, fmt.Errorf("error running quantile sketch downstream query: %w", err) + return nil, err } - xs := make([]StepEvaluator, 0, len(queries)) - for _, res := range results { - if res.Data.Type() != QuantileSketchMatrixType { - return nil, fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), QuantileSketchMatrixType) - } - data, ok := res.Data.(ProbabilisticQuantileMatrix) - if !ok { - return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data) - } - stepper := NewQuantileSketchMatrixStepEvaluator(data, params) - xs = append(xs, stepper) + if len(results) != 1 { + return nil, fmt.Errorf("unexpected results length for sharded quantile: got (%d), want (1)", len(results)) } - inner := NewQuantileSketchMergeStepEvaluator(xs) - + matrix, ok := results[0].Data.(ProbabilisticQuantileMatrix) + if !ok { + return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", results[0].Data) + } + inner := NewQuantileSketchMatrixStepEvaluator(matrix, params) return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil default: diff --git a/pkg/logql/quantile_over_time_sketch.go b/pkg/logql/quantile_over_time_sketch.go index 121a3e5511..3d469de407 100644 --- a/pkg/logql/quantile_over_time_sketch.go +++ b/pkg/logql/quantile_over_time_sketch.go @@ -3,6 +3,7 @@ package logql import ( "fmt" "math" + "sync" "time" "github.com/prometheus/prometheus/model/labels" @@ -23,9 +24,17 @@ const ( type ProbabilisticQuantileVector []ProbabilisticQuantileSample type ProbabilisticQuantileMatrix []ProbabilisticQuantileVector +var streamHashPool = sync.Pool{ + New: func() interface{} { return make(map[uint64]int) }, +} + func (q ProbabilisticQuantileVector) Merge(right ProbabilisticQuantileVector) (ProbabilisticQuantileVector, error) { // labels hash to vector index map - groups := make(map[uint64]int) + groups := streamHashPool.Get().(map[uint64]int) + defer func() { + clear(groups) + streamHashPool.Put(groups) + }() for i, sample := range q { groups[sample.Metric.Hash()] = i } @@ -80,6 +89,21 @@ func (ProbabilisticQuantileMatrix) String() string { return "QuantileSketchMatrix()" } +func (m ProbabilisticQuantileMatrix) Merge(right ProbabilisticQuantileMatrix) (ProbabilisticQuantileMatrix, error) { + if len(m) != len(right) { + return nil, fmt.Errorf("failed to merge probabilistic quantile matrix: lengths differ %d!=%d", len(m), len(right)) + } + var err error + for i, vec := range m { + m[i], err = vec.Merge(right[i]) + if err != nil { + return nil, fmt.Errorf("failed to merge probabilistic quantile matrix: %w", err) + } + } + + return m, nil +} + func (ProbabilisticQuantileMatrix) Type() promql_parser.ValueType { return QuantileSketchMatrixType } func (m ProbabilisticQuantileMatrix) Release() { @@ -398,6 +422,9 @@ func NewQuantileSketchVectorStepEvaluator(inner StepEvaluator, quantile float64) func (e *QuantileSketchVectorStepEvaluator) Next() (bool, int64, StepResult) { ok, ts, r := e.inner.Next() + if !ok { + return false, 0, SampleVector{} + } quantileSketchVec := r.QuantileSketchVec() vec := make(promql.Vector, len(quantileSketchVec)) diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index b979dedb42..82442e09bf 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -225,6 +225,16 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu results = append(results, res) } + + if matrix, ok := results[0].Data.(ProbabilisticQuantileMatrix); ok { + if len(results) == 1 { + return results, nil + } + for _, m := range results[1:] { + matrix, _ = matrix.Merge(m.Data.(ProbabilisticQuantileMatrix)) + } + return []logqlmodel.Result{{Data: matrix}}, nil + } return results, nil } diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 860aa980fb..d8514e8a4e 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" "github.com/prometheus/prometheus/model/labels" @@ -125,58 +126,46 @@ func (in instance) For( queries []logql.DownstreamQuery, fn func(logql.DownstreamQuery) (logqlmodel.Result, error), ) ([]logqlmodel.Result, error) { - type resp struct { - i int - res logqlmodel.Result - err error - } - ctx, cancel := context.WithCancel(ctx) defer cancel() - ch := make(chan resp) - // Make one goroutine to dispatch the other goroutines, bounded by instance parallelism + ch := make(chan logql.Resp) + + // ForEachJob blocks until all are done. However, we want to process the + // results as they come in. That's why we start everything in another + // gorouting. go func() { - for i := 0; i < len(queries); i++ { + err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error { + res, err := fn(queries[i]) + response := logql.Resp{ + I: i, + Res: res, + Err: err, + } + + // Feed the result into the channel unless the work has completed. select { case <-ctx.Done(): - break - case <-in.locks: - go func(i int) { - // release lock back into pool - defer func() { - in.locks <- struct{}{} - }() - - res, err := fn(queries[i]) - response := resp{ - i: i, - res: res, - err: err, - } - - // Feed the result into the channel unless the work has completed. - select { - case <-ctx.Done(): - case ch <- response: - } - }(i) + case ch <- response: + } + return err + }) + if err != nil { + ch <- logql.Resp{ + I: -1, + Err: err, } } + close(ch) }() acc := newDownstreamAccumulator(queries[0].Params, len(queries)) - for i := 0; i < len(queries); i++ { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case resp := <-ch: - if resp.err != nil { - return nil, resp.err - } - if err := acc.Accumulate(ctx, resp.i, resp.res); err != nil { - return nil, err - } + for resp := range ch { + if resp.Err != nil { + return nil, resp.Err + } + if err := acc.Accumulate(ctx, resp.I, resp.Res); err != nil { + return nil, err } } return acc.Result(), nil @@ -222,8 +211,8 @@ func sampleStreamToVector(streams []queryrangebase.SampleStream) parser.Value { return xs } -// downstreamAccumulator is one of two variants: -// a logsAccumulator or a bufferedAccumulator. +// downstreamAccumulator is one of three variants: +// a logsAccumulator, a bufferedAccumulator, or a quantileSketchAccumulator. // Which variant is detected on the first call to Accumulate. // Metric queries, which are generally small payloads, are buffered // since the memory overhead is negligible. @@ -232,6 +221,7 @@ func sampleStreamToVector(streams []queryrangebase.SampleStream) parser.Value { // accumulate the results into a logsAccumulator, discarding values // over the limit to keep memory pressure down while other subqueries // are executing. +// Sharded probabilistic quantile query results are merged as they come in. type downstreamAccumulator struct { acc resultAccumulator params logql.Params @@ -248,7 +238,8 @@ func newDownstreamAccumulator(params logql.Params, nQueries int) *downstreamAccu } func (a *downstreamAccumulator) build(acc logqlmodel.Result) { - if acc.Data.Type() == logqlmodel.ValueTypeStreams { + switch acc.Data.Type() { + case logqlmodel.ValueTypeStreams: // the stream accumulator stores a heap with reversed order // from the results we expect, so we need to reverse the direction @@ -258,8 +249,9 @@ func (a *downstreamAccumulator) build(acc logqlmodel.Result) { } a.acc = newStreamAccumulator(direction, int(a.params.Limit())) - - } else { + case logql.QuantileSketchMatrixType: + a.acc = newQuantileSketchAccumulator() + default: a.acc = &bufferedAccumulator{ results: make([]logqlmodel.Result, a.n), } @@ -297,6 +289,36 @@ func (a *bufferedAccumulator) Result() []logqlmodel.Result { return a.results } +type quantileSketchAccumulator struct { + matrix logql.ProbabilisticQuantileMatrix +} + +func newQuantileSketchAccumulator() *quantileSketchAccumulator { + return &quantileSketchAccumulator{} +} + +func (a *quantileSketchAccumulator) Accumulate(res logqlmodel.Result, _ int) error { + if res.Data.Type() != logql.QuantileSketchMatrixType { + return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), logql.QuantileSketchMatrixType) + } + data, ok := res.Data.(logql.ProbabilisticQuantileMatrix) + if !ok { + return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data) + } + if a.matrix == nil { + a.matrix = data + return nil + } + + var err error + a.matrix, err = a.matrix.Merge(data) + return err +} + +func (a *quantileSketchAccumulator) Result() []logqlmodel.Result { + return []logqlmodel.Result{{Data: a.matrix}} +} + // heap impl for keeping only the top n results across m streams // importantly, accumulatedStreams is _bounded_, so it will only // store the top `limit` results across all streams. diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index e453f03d9a..007166c30c 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "strconv" "strings" "sync" @@ -18,6 +19,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/sketch" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -598,3 +600,93 @@ func TestDownstreamAccumulatorMultiMerge(t *testing.T) { }) } } + +func BenchmarkAccumulator(b *testing.B) { + + // dummy params. Only need to populate direction & limit + lim := 30 + params, err := logql.NewLiteralParams( + `{app="foo"}`, time.Time{}, time.Time{}, 0, 0, logproto.BACKWARD, uint32(lim), nil, + ) + require.NoError(b, err) + + for acc, tc := range map[string]struct { + results []logqlmodel.Result + params logql.Params + }{ + "streams": { + newStreamResults(), + params, + }, + "quantile sketches": { + newQuantileSketchResults(), + params, + }, + } { + b.Run(acc, func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + + acc := newDownstreamAccumulator(params, len(tc.results)) + for i, r := range tc.results { + err := acc.Accumulate(context.Background(), i, r) + require.Nil(b, err) + } + + acc.Result() + } + }) + } +} + +func newStreamResults() []logqlmodel.Result { + nQueries := 50 + delta := 100 // 10 entries per stream, 1s apart + streamsPerQuery := 50 + + results := make([]logqlmodel.Result, nQueries) + for i := 0; i < nQueries; i++ { + start := i * delta + end := start + delta + streams := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, streamsPerQuery, logproto.BACKWARD) + var res logqlmodel.Streams + for i := range streams { + res = append(res, *streams[i]) + } + results[i] = logqlmodel.Result{Data: res} + + } + + return results +} + +func newQuantileSketchResults() []logqlmodel.Result { + results := make([]logqlmodel.Result, 100) + + for r := range results { + vectors := make([]logql.ProbabilisticQuantileVector, 10) + for i := range vectors { + vectors[i] = make(logql.ProbabilisticQuantileVector, 10) + for j := range vectors[i] { + vectors[i][j] = logql.ProbabilisticQuantileSample{ + T: int64(i), + F: newRandomSketch(), + Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}}, + } + } + } + results[r] = logqlmodel.Result{Data: logql.ProbabilisticQuantileMatrix(vectors)} + } + + return results +} + +func newRandomSketch() sketch.QuantileSketch { + r := rand.New(rand.NewSource(42)) + s := sketch.NewDDSketch() + for i := 0; i < 1000; i++ { + _ = s.Add(r.Float64()) + } + return s +}