mirror of https://github.com/grafana/loki
Approximate `quantile_over_time` (#10417)
**What this PR does / why we need it**:
This change shards `quantile_over_time` queries using t-digest or
DDSketch approximations. It can be enabled with `querier.shard_aggregations=quantile_over_time`.
Outstanding
- [x] Replace generic return type of `StepEvaluator` with interface
`StepResult`.
- [x] Send mapped query with quantile sketch expression from frontend to
querier over the wire.
- [x] Serialize sketches. See
https://github.com/influxdata/tdigest/issues/34
- [x] Add feature flag.
**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [x] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
---------
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Callum Styan <callumstyan@gmail.com>
pull/11424/head
parent
5e3496739c
commit
f67fff3eb2
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,413 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/prometheus/prometheus/promql" |
||||
promql_parser "github.com/prometheus/prometheus/promql/parser" |
||||
|
||||
"github.com/grafana/loki/pkg/iter" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logql/sketch" |
||||
"github.com/grafana/loki/pkg/logqlmodel" |
||||
) |
||||
|
||||
const ( |
||||
QuantileSketchMatrixType = "QuantileSketchMatrix" |
||||
) |
||||
|
||||
type ProbabilisticQuantileVector []ProbabilisticQuantileSample |
||||
type ProbabilisticQuantileMatrix []ProbabilisticQuantileVector |
||||
|
||||
func (q ProbabilisticQuantileVector) Merge(right ProbabilisticQuantileVector) (ProbabilisticQuantileVector, error) { |
||||
// labels hash to vector index map
|
||||
groups := make(map[uint64]int) |
||||
for i, sample := range q { |
||||
groups[sample.Metric.Hash()] = i |
||||
} |
||||
|
||||
for _, sample := range right { |
||||
i, ok := groups[sample.Metric.Hash()] |
||||
if !ok { |
||||
q = append(q, sample) |
||||
continue |
||||
} |
||||
|
||||
_, err := q[i].F.Merge(sample.F) |
||||
if err != nil { |
||||
return q, err |
||||
} |
||||
} |
||||
|
||||
return q, nil |
||||
} |
||||
|
||||
func (ProbabilisticQuantileVector) SampleVector() promql.Vector { |
||||
return promql.Vector{} |
||||
} |
||||
|
||||
func (q ProbabilisticQuantileVector) QuantileSketchVec() ProbabilisticQuantileVector { |
||||
return q |
||||
} |
||||
|
||||
func (q ProbabilisticQuantileVector) ToProto() *logproto.QuantileSketchVector { |
||||
samples := make([]*logproto.QuantileSketchSample, len(q)) |
||||
for i, sample := range q { |
||||
samples[i] = sample.ToProto() |
||||
} |
||||
return &logproto.QuantileSketchVector{Samples: samples} |
||||
} |
||||
|
||||
func ProbabilisticQuantileVectorFromProto(proto *logproto.QuantileSketchVector) (ProbabilisticQuantileVector, error) { |
||||
out := make([]ProbabilisticQuantileSample, len(proto.Samples)) |
||||
var s ProbabilisticQuantileSample |
||||
var err error |
||||
for i, sample := range proto.Samples { |
||||
s, err = probabilisticQuantileSampleFromProto(sample) |
||||
if err != nil { |
||||
return ProbabilisticQuantileVector{}, err |
||||
} |
||||
out[i] = s |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
func (ProbabilisticQuantileMatrix) String() string { |
||||
return "QuantileSketchMatrix()" |
||||
} |
||||
|
||||
func (ProbabilisticQuantileMatrix) Type() promql_parser.ValueType { return QuantileSketchMatrixType } |
||||
|
||||
func (m ProbabilisticQuantileMatrix) ToProto() *logproto.QuantileSketchMatrix { |
||||
values := make([]*logproto.QuantileSketchVector, len(m)) |
||||
for i, vec := range m { |
||||
values[i] = vec.ToProto() |
||||
} |
||||
return &logproto.QuantileSketchMatrix{Values: values} |
||||
} |
||||
|
||||
func ProbabilisticQuantileMatrixFromProto(proto *logproto.QuantileSketchMatrix) (ProbabilisticQuantileMatrix, error) { |
||||
out := make([]ProbabilisticQuantileVector, len(proto.Values)) |
||||
var s ProbabilisticQuantileVector |
||||
var err error |
||||
for i, v := range proto.Values { |
||||
s, err = ProbabilisticQuantileVectorFromProto(v) |
||||
if err != nil { |
||||
return ProbabilisticQuantileMatrix{}, err |
||||
} |
||||
out[i] = s |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
type QuantileSketchStepEvaluator struct { |
||||
iter RangeVectorIterator |
||||
|
||||
err error |
||||
} |
||||
|
||||
func (e *QuantileSketchStepEvaluator) Next() (bool, int64, StepResult) { |
||||
next := e.iter.Next() |
||||
if !next { |
||||
return false, 0, ProbabilisticQuantileVector{} |
||||
} |
||||
ts, r := e.iter.At() |
||||
vec := r.QuantileSketchVec() |
||||
for _, s := range vec { |
||||
// Errors are not allowed in metrics unless they've been specifically requested.
|
||||
if s.Metric.Has(logqlmodel.ErrorLabel) && s.Metric.Get(logqlmodel.PreserveErrorLabel) != "true" { |
||||
e.err = logqlmodel.NewPipelineErr(s.Metric) |
||||
return false, 0, ProbabilisticQuantileVector{} |
||||
} |
||||
} |
||||
return true, ts, vec |
||||
} |
||||
|
||||
func (e *QuantileSketchStepEvaluator) Close() error { return e.iter.Close() } |
||||
|
||||
func (e *QuantileSketchStepEvaluator) Error() error { |
||||
if e.err != nil { |
||||
return e.err |
||||
} |
||||
return e.iter.Error() |
||||
} |
||||
|
||||
func (e *QuantileSketchStepEvaluator) Explain(parent Node) { |
||||
parent.Child("QuantileSketch") |
||||
} |
||||
|
||||
func newQuantileSketchIterator( |
||||
it iter.PeekingSampleIterator, |
||||
selRange, step, start, end, offset int64) RangeVectorIterator { |
||||
inner := &batchRangeVectorIterator{ |
||||
iter: it, |
||||
step: step, |
||||
end: end, |
||||
selRange: selRange, |
||||
metrics: map[string]labels.Labels{}, |
||||
window: map[string]*promql.Series{}, |
||||
agg: nil, |
||||
current: start - step, // first loop iteration will set it to start
|
||||
offset: offset, |
||||
} |
||||
return &quantileSketchBatchRangeVectorIterator{ |
||||
batchRangeVectorIterator: inner, |
||||
} |
||||
} |
||||
|
||||
//batch
|
||||
|
||||
type ProbabilisticQuantileSample struct { |
||||
T int64 |
||||
F sketch.QuantileSketch |
||||
|
||||
Metric labels.Labels |
||||
} |
||||
|
||||
func (q ProbabilisticQuantileSample) ToProto() *logproto.QuantileSketchSample { |
||||
metric := make([]*logproto.LabelPair, len(q.Metric)) |
||||
for i, m := range q.Metric { |
||||
metric[i] = &logproto.LabelPair{Name: m.Name, Value: m.Value} |
||||
} |
||||
|
||||
sketch := q.F.ToProto() |
||||
|
||||
return &logproto.QuantileSketchSample{ |
||||
F: sketch, |
||||
TimestampMs: q.T, |
||||
Metric: metric, |
||||
} |
||||
} |
||||
|
||||
func probabilisticQuantileSampleFromProto(proto *logproto.QuantileSketchSample) (ProbabilisticQuantileSample, error) { |
||||
s, err := sketch.QuantileSketchFromProto(proto.F) |
||||
if err != nil { |
||||
return ProbabilisticQuantileSample{}, err |
||||
} |
||||
out := ProbabilisticQuantileSample{ |
||||
T: proto.TimestampMs, |
||||
F: s, |
||||
Metric: make(labels.Labels, len(proto.Metric)), |
||||
} |
||||
|
||||
for i, p := range proto.Metric { |
||||
out.Metric[i] = labels.Label{Name: p.Name, Value: p.Value} |
||||
} |
||||
|
||||
return out, nil |
||||
} |
||||
|
||||
type quantileSketchBatchRangeVectorIterator struct { |
||||
*batchRangeVectorIterator |
||||
at []ProbabilisticQuantileSample |
||||
} |
||||
|
||||
func (r *quantileSketchBatchRangeVectorIterator) At() (int64, StepResult) { |
||||
if r.at == nil { |
||||
r.at = make([]ProbabilisticQuantileSample, 0, len(r.window)) |
||||
} |
||||
r.at = r.at[:0] |
||||
// convert ts from nano to milli seconds as the iterator work with nanoseconds
|
||||
ts := r.current/1e+6 + r.offset/1e+6 |
||||
for _, series := range r.window { |
||||
r.at = append(r.at, ProbabilisticQuantileSample{ |
||||
F: r.agg(series.Floats), |
||||
T: ts, |
||||
Metric: series.Metric, |
||||
}) |
||||
} |
||||
return ts, ProbabilisticQuantileVector(r.at) |
||||
} |
||||
|
||||
func (r *quantileSketchBatchRangeVectorIterator) agg(samples []promql.FPoint) sketch.QuantileSketch { |
||||
s := sketch.NewDDSketch() |
||||
for _, v := range samples { |
||||
// The sketch from the underlying sketch package we are using
|
||||
// cannot return an error when calling Add.
|
||||
s.Add(v.F) //nolint:errcheck
|
||||
} |
||||
return s |
||||
} |
||||
|
||||
// JoinQuantileSketchVector joins the results from stepEvaluator into a ProbabilisticQuantileMatrix.
|
||||
func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluator) (promql_parser.Value, error) { |
||||
vec := r.QuantileSketchVec() |
||||
if stepEvaluator.Error() != nil { |
||||
return nil, stepEvaluator.Error() |
||||
} |
||||
|
||||
result := make([]ProbabilisticQuantileVector, 0) |
||||
|
||||
for next { |
||||
result = append(result, vec) |
||||
|
||||
next, _, r = stepEvaluator.Next() |
||||
vec = r.QuantileSketchVec() |
||||
if stepEvaluator.Error() != nil { |
||||
return nil, stepEvaluator.Error() |
||||
} |
||||
} |
||||
|
||||
return ProbabilisticQuantileMatrix(result), stepEvaluator.Error() |
||||
} |
||||
|
||||
// QuantileSketchMatrixStepEvaluator steps through a matrix of quantile sketch
|
||||
// vectors, ie t-digest or DDSketch structures per time step.
|
||||
type QuantileSketchMatrixStepEvaluator struct { |
||||
start, end, ts time.Time |
||||
step time.Duration |
||||
m ProbabilisticQuantileMatrix |
||||
} |
||||
|
||||
func NewQuantileSketchMatrixStepEvaluator(m ProbabilisticQuantileMatrix, params Params) *QuantileSketchMatrixStepEvaluator { |
||||
var ( |
||||
start = params.Start() |
||||
end = params.End() |
||||
step = params.Step() |
||||
) |
||||
return &QuantileSketchMatrixStepEvaluator{ |
||||
start: start, |
||||
end: end, |
||||
ts: start.Add(-step), // will be corrected on first Next() call
|
||||
step: step, |
||||
m: m, |
||||
} |
||||
} |
||||
|
||||
func (m *QuantileSketchMatrixStepEvaluator) Next() (bool, int64, StepResult) { |
||||
m.ts = m.ts.Add(m.step) |
||||
if m.ts.After(m.end) { |
||||
return false, 0, nil |
||||
} |
||||
|
||||
ts := m.ts.UnixNano() / int64(time.Millisecond) |
||||
|
||||
if len(m.m) == 0 { |
||||
return false, 0, nil |
||||
} |
||||
|
||||
vec := m.m[0] |
||||
|
||||
// Reset for next step
|
||||
m.m = m.m[1:] |
||||
|
||||
return true, ts, vec |
||||
} |
||||
|
||||
func (*QuantileSketchMatrixStepEvaluator) Close() error { return nil } |
||||
|
||||
func (*QuantileSketchMatrixStepEvaluator) Error() error { return nil } |
||||
|
||||
func (*QuantileSketchMatrixStepEvaluator) Explain(parent Node) { |
||||
parent.Child("QuantileSketchMatrix") |
||||
} |
||||
|
||||
// QuantileSketchMergeStepEvaluator merges multiple quantile sketches into one for each
|
||||
// step.
|
||||
type QuantileSketchMergeStepEvaluator struct { |
||||
evaluators []StepEvaluator |
||||
err error |
||||
} |
||||
|
||||
func NewQuantileSketchMergeStepEvaluator(evaluators []StepEvaluator) *QuantileSketchMergeStepEvaluator { |
||||
return &QuantileSketchMergeStepEvaluator{ |
||||
evaluators: evaluators, |
||||
err: nil, |
||||
} |
||||
} |
||||
|
||||
func (e *QuantileSketchMergeStepEvaluator) Next() (bool, int64, StepResult) { |
||||
ok, ts, r := e.evaluators[0].Next() |
||||
var cur ProbabilisticQuantileVector |
||||
if ok { |
||||
cur = r.QuantileSketchVec() |
||||
} |
||||
|
||||
if len(e.evaluators) == 1 { |
||||
return ok, ts, cur |
||||
} |
||||
|
||||
for _, eval := range e.evaluators[1:] { |
||||
ok, nextTs, vec := eval.Next() |
||||
if ok { |
||||
if cur == nil { |
||||
cur = vec.QuantileSketchVec() |
||||
} else { |
||||
if ts != nextTs { |
||||
e.err = fmt.Errorf("timestamps of sketches differ: %d!=%d", ts, nextTs) |
||||
return false, 0, nil |
||||
} |
||||
|
||||
_, e.err = cur.Merge(vec.QuantileSketchVec()) |
||||
if e.err != nil { |
||||
return false, 0, nil |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
return ok, ts, cur |
||||
} |
||||
|
||||
func (*QuantileSketchMergeStepEvaluator) Close() error { return nil } |
||||
|
||||
func (e *QuantileSketchMergeStepEvaluator) Error() error { return e.err } |
||||
|
||||
func (e *QuantileSketchMergeStepEvaluator) Explain(parent Node) { |
||||
b := parent.Child("QuantileSketchMerge") |
||||
if len(e.evaluators) < MaxChildrenDisplay { |
||||
for _, child := range e.evaluators { |
||||
child.Explain(b) |
||||
} |
||||
} else { |
||||
e.evaluators[0].Explain(b) |
||||
b.Child("...") |
||||
e.evaluators[len(e.evaluators)-1].Explain(b) |
||||
} |
||||
} |
||||
|
||||
// QuantileSketchVectorStepEvaluator evaluates a quantile sketch into a
|
||||
// promql.Vector.
|
||||
type QuantileSketchVectorStepEvaluator struct { |
||||
inner StepEvaluator |
||||
quantile float64 |
||||
} |
||||
|
||||
var _ StepEvaluator = NewQuantileSketchVectorStepEvaluator(nil, 0) |
||||
|
||||
func NewQuantileSketchVectorStepEvaluator(inner StepEvaluator, quantile float64) *QuantileSketchVectorStepEvaluator { |
||||
return &QuantileSketchVectorStepEvaluator{ |
||||
inner: inner, |
||||
quantile: quantile, |
||||
} |
||||
} |
||||
|
||||
func (e *QuantileSketchVectorStepEvaluator) Next() (bool, int64, StepResult) { |
||||
ok, ts, r := e.inner.Next() |
||||
quantileSketchVec := r.QuantileSketchVec() |
||||
|
||||
vec := make(promql.Vector, len(quantileSketchVec)) |
||||
|
||||
for i, quantileSketch := range quantileSketchVec { |
||||
f, _ := quantileSketch.F.Quantile(e.quantile) |
||||
|
||||
vec[i] = promql.Sample{ |
||||
T: quantileSketch.T, |
||||
F: f, |
||||
Metric: quantileSketch.Metric, |
||||
} |
||||
} |
||||
|
||||
return ok, ts, SampleVector(vec) |
||||
} |
||||
|
||||
func (*QuantileSketchVectorStepEvaluator) Close() error { return nil } |
||||
|
||||
func (*QuantileSketchVectorStepEvaluator) Error() error { return nil } |
||||
|
||||
func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) { |
||||
b := parent.Child("QuantileSketchVector") |
||||
e.inner.Explain(b) |
||||
} |
||||
@ -0,0 +1,109 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"errors" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logql/sketch" |
||||
"github.com/grafana/loki/pkg/logqlmodel" |
||||
) |
||||
|
||||
func TestProbabilisticMQuantileMatrixSerialization(t *testing.T) { |
||||
emptySketch := sketch.NewDDSketch() |
||||
ddsketchBytes := make([]byte, 0) |
||||
emptySketch.Encode(&ddsketchBytes, false) |
||||
|
||||
matrix := ProbabilisticQuantileMatrix([]ProbabilisticQuantileVector{ |
||||
[]ProbabilisticQuantileSample{ |
||||
{T: 0, F: emptySketch, Metric: []labels.Label{{Name: "foo", Value: "bar"}}}, |
||||
}, |
||||
}) |
||||
|
||||
proto := &logproto.QuantileSketchMatrix{ |
||||
Values: []*logproto.QuantileSketchVector{ |
||||
{ |
||||
Samples: []*logproto.QuantileSketchSample{ |
||||
{ |
||||
TimestampMs: 0, |
||||
F: &logproto.QuantileSketch{Sketch: &logproto.QuantileSketch_Ddsketch{Ddsketch: ddsketchBytes}}, |
||||
Metric: []*logproto.LabelPair{{Name: "foo", Value: "bar"}}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
actual := matrix.ToProto() |
||||
require.Equal(t, proto, actual) |
||||
|
||||
_, err := ProbabilisticQuantileMatrixFromProto(actual) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestQuantileSketchStepEvaluatorError(t *testing.T) { |
||||
iter := errorRangeVectorIterator{ |
||||
result: ProbabilisticQuantileVector([]ProbabilisticQuantileSample{ |
||||
{T: 43, F: nil, Metric: labels.Labels{{Name: logqlmodel.ErrorLabel, Value: "my error"}}}, |
||||
}), |
||||
} |
||||
ev := QuantileSketchStepEvaluator{ |
||||
iter: iter, |
||||
} |
||||
ok, _, _ := ev.Next() |
||||
require.False(t, ok) |
||||
|
||||
err := ev.Error() |
||||
require.ErrorContains(t, err, "my error") |
||||
} |
||||
|
||||
func TestJoinQuantileSketchVectorError(t *testing.T) { |
||||
result := ProbabilisticQuantileVector{} |
||||
ev := errorStepEvaluator{ |
||||
err: errors.New("could not evaluate"), |
||||
} |
||||
_, err := JoinQuantileSketchVector(true, result, ev) |
||||
require.ErrorContains(t, err, "could not evaluate") |
||||
} |
||||
|
||||
type errorRangeVectorIterator struct { |
||||
err error |
||||
result StepResult |
||||
} |
||||
|
||||
func (e errorRangeVectorIterator) Next() bool { |
||||
return e.result != nil |
||||
} |
||||
|
||||
func (e errorRangeVectorIterator) At() (int64, StepResult) { |
||||
return 0, e.result |
||||
} |
||||
|
||||
func (errorRangeVectorIterator) Close() error { |
||||
return nil |
||||
} |
||||
|
||||
func (e errorRangeVectorIterator) Error() error { |
||||
return e.err |
||||
} |
||||
|
||||
type errorStepEvaluator struct { |
||||
err error |
||||
} |
||||
|
||||
func (errorStepEvaluator) Next() (ok bool, ts int64, r StepResult) { |
||||
return false, 0, nil |
||||
} |
||||
|
||||
func (errorStepEvaluator) Close() error { |
||||
return nil |
||||
} |
||||
|
||||
func (e errorStepEvaluator) Error() error { |
||||
return e.err |
||||
} |
||||
|
||||
func (e errorStepEvaluator) Explain(Node) {} |
||||
@ -1,81 +0,0 @@ |
||||
package sketch |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math/rand" |
||||
"sort" |
||||
"testing" |
||||
|
||||
"github.com/gogo/protobuf/proto" |
||||
"github.com/prometheus/prometheus/promql" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/logql" |
||||
"github.com/grafana/loki/pkg/logql/vector" |
||||
) |
||||
|
||||
func TestQuantiles(t *testing.T) { |
||||
// v controls the distribution of values along the curve, a greater v
|
||||
// value means there's a large distance between generated values
|
||||
vs := []float64{1.0, 5.0, 10.0} |
||||
// s controls the exponential curve of the distribution
|
||||
// the higher the s values the faster the drop off from max value to lesser values
|
||||
// s must be > 1.0
|
||||
ss := []float64{1.01, 2.0, 3.0, 4.0} |
||||
|
||||
// T-Digest is too big for 1_000 samples. However, we did not optimize
|
||||
// the format for size.
|
||||
nSamples := []int{5_000, 10_000, 100_000, 1_000_000} |
||||
|
||||
factories := []struct { |
||||
newSketch QuantileSketchFactory |
||||
name string |
||||
relativeError float64 |
||||
}{ |
||||
{newSketch: func() QuantileSketch { return NewDDSketch() }, name: "DDSketch", relativeError: 0.02}, |
||||
{newSketch: NewTDigestSketch, name: "T-Digest", relativeError: 0.05}, |
||||
} |
||||
|
||||
for _, tc := range factories { |
||||
for _, samplesCount := range nSamples { |
||||
for _, s := range ss { |
||||
for _, v := range vs { |
||||
t.Run(fmt.Sprintf("sketch=%s, s=%.2f, v=%.2f, events=%d", tc.name, s, v, samplesCount), func(t *testing.T) { |
||||
sketch := tc.newSketch() |
||||
|
||||
r := rand.New(rand.NewSource(42)) |
||||
z := rand.NewZipf(r, s, v, 1_000) |
||||
values := make(vector.HeapByMaxValue, 0) |
||||
for i := 0; i < samplesCount; i++ { |
||||
|
||||
value := float64(z.Uint64()) |
||||
values = append(values, promql.Sample{F: value}) |
||||
err := sketch.Add(value) |
||||
require.NoError(t, err) |
||||
} |
||||
sort.Sort(values) |
||||
|
||||
// Size
|
||||
var buf []byte |
||||
var err error |
||||
switch s := sketch.(type) { |
||||
case *DDSketchQuantile: |
||||
buf, err = proto.Marshal(s.DDSketch.ToProto()) |
||||
require.NoError(t, err) |
||||
case *TDigestQuantile: |
||||
buf, err = proto.Marshal(s.ToProto()) |
||||
require.NoError(t, err) |
||||
} |
||||
require.Less(t, len(buf), samplesCount*8) |
||||
|
||||
// Accuracy
|
||||
expected := logql.Quantile(0.99, values) |
||||
actual, err := sketch.Quantile(0.99) |
||||
require.NoError(t, err) |
||||
require.InEpsilonf(t, expected, actual, tc.relativeError, "expected quantile %f, actual quantile %f", expected, actual) |
||||
}) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,298 @@ |
||||
package syntax |
||||
|
||||
import ( |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
|
||||
"github.com/grafana/loki/pkg/logql/log" |
||||
) |
||||
|
||||
type cloneVisitor struct { |
||||
cloned Expr |
||||
} |
||||
|
||||
var _ RootVisitor = &cloneVisitor{} |
||||
|
||||
func cloneGrouping(g *Grouping) *Grouping { |
||||
copied := &Grouping{ |
||||
Without: g.Without, |
||||
} |
||||
if g.Groups != nil { |
||||
copied.Groups = make([]string, len(g.Groups)) |
||||
copy(copied.Groups, g.Groups) |
||||
} |
||||
return copied |
||||
} |
||||
|
||||
func cloneVectorMatching(v *VectorMatching) *VectorMatching { |
||||
copied := *v |
||||
copy(copied.Include, v.Include) |
||||
copy(copied.MatchingLabels, v.MatchingLabels) |
||||
|
||||
return &copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitBinOp(e *BinOpExpr) { |
||||
lhs := MustClone[SampleExpr](e.SampleExpr) |
||||
rhs := MustClone[SampleExpr](e.RHS) |
||||
copied := &BinOpExpr{ |
||||
SampleExpr: lhs, |
||||
RHS: rhs, |
||||
Op: e.Op, |
||||
} |
||||
|
||||
if e.Opts != nil { |
||||
copied.Opts = &BinOpOptions{ |
||||
ReturnBool: e.Opts.ReturnBool, |
||||
VectorMatching: cloneVectorMatching(e.Opts.VectorMatching), |
||||
} |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitVectorAggregation(e *VectorAggregationExpr) { |
||||
copied := &VectorAggregationExpr{ |
||||
Left: MustClone[SampleExpr](e.Left), |
||||
Params: e.Params, |
||||
Operation: e.Operation, |
||||
} |
||||
|
||||
if e.Grouping != nil { |
||||
copied.Grouping = cloneGrouping(e.Grouping) |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitRangeAggregation(e *RangeAggregationExpr) { |
||||
copied := &RangeAggregationExpr{ |
||||
Left: MustClone[*LogRange](e.Left), |
||||
Operation: e.Operation, |
||||
} |
||||
|
||||
if e.Grouping != nil { |
||||
copied.Grouping = cloneGrouping(e.Grouping) |
||||
} |
||||
|
||||
if e.Params != nil { |
||||
tmp := *e.Params |
||||
copied.Params = &tmp |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLabelReplace(e *LabelReplaceExpr) { |
||||
left := MustClone[SampleExpr](e.Left) |
||||
v.cloned = mustNewLabelReplaceExpr(left, e.Dst, e.Replacement, e.Src, e.Regex) |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLiteral(e *LiteralExpr) { |
||||
v.cloned = &LiteralExpr{Val: e.Val} |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitVector(e *VectorExpr) { |
||||
v.cloned = &VectorExpr{Val: e.Val} |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLogRange(e *LogRange) { |
||||
copied := &LogRange{ |
||||
Left: MustClone[LogSelectorExpr](e.Left), |
||||
Interval: e.Interval, |
||||
Offset: e.Offset, |
||||
} |
||||
if e.Unwrap != nil { |
||||
copied.Unwrap = &UnwrapExpr{ |
||||
Identifier: e.Unwrap.Identifier, |
||||
Operation: e.Unwrap.Operation, |
||||
} |
||||
if e.Unwrap.PostFilters != nil { |
||||
copied.Unwrap.PostFilters = make([]log.LabelFilterer, len(e.Unwrap.PostFilters)) |
||||
for i, f := range e.Unwrap.PostFilters { |
||||
copied.Unwrap.PostFilters[i] = cloneLabelFilterer(f) |
||||
} |
||||
} |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitMatchers(e *MatchersExpr) { |
||||
copied := &MatchersExpr{ |
||||
Mts: make([]*labels.Matcher, len(e.Mts)), |
||||
} |
||||
for i, m := range e.Mts { |
||||
copied.Mts[i] = labels.MustNewMatcher(m.Type, m.Name, m.Value) |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitPipeline(e *PipelineExpr) { |
||||
copied := &PipelineExpr{ |
||||
Left: MustClone[*MatchersExpr](e.Left), |
||||
MultiStages: make(MultiStageExpr, len(e.MultiStages)), |
||||
} |
||||
for i, s := range e.MultiStages { |
||||
copied.MultiStages[i] = MustClone[StageExpr](s) |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitDecolorize(*DecolorizeExpr) { |
||||
v.cloned = &DecolorizeExpr{} |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitDropLabels(e *DropLabelsExpr) { |
||||
copied := &DropLabelsExpr{ |
||||
dropLabels: make([]log.DropLabel, len(e.dropLabels)), |
||||
} |
||||
for i, l := range e.dropLabels { |
||||
var matcher *labels.Matcher |
||||
if l.Matcher != nil { |
||||
matcher = labels.MustNewMatcher(l.Matcher.Type, l.Matcher.Name, l.Matcher.Value) |
||||
} |
||||
copied.dropLabels[i] = log.NewDropLabel(matcher, l.Name) |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitJSONExpressionParser(e *JSONExpressionParser) { |
||||
copied := &JSONExpressionParser{ |
||||
Expressions: make([]log.LabelExtractionExpr, len(e.Expressions)), |
||||
} |
||||
copy(copied.Expressions, e.Expressions) |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitKeepLabel(e *KeepLabelsExpr) { |
||||
copied := &KeepLabelsExpr{ |
||||
keepLabels: make([]log.KeepLabel, len(e.keepLabels)), |
||||
} |
||||
for i, k := range e.keepLabels { |
||||
copied.keepLabels[i] = log.KeepLabel{ |
||||
Name: k.Name, |
||||
} |
||||
if k.Matcher != nil { |
||||
copied.keepLabels[i].Matcher = labels.MustNewMatcher(k.Matcher.Type, k.Matcher.Name, k.Matcher.Value) |
||||
} |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLabelFilter(e *LabelFilterExpr) { |
||||
v.cloned = &LabelFilterExpr{ |
||||
LabelFilterer: cloneLabelFilterer(e.LabelFilterer), |
||||
} |
||||
} |
||||
|
||||
func cloneLabelFilterer(filter log.LabelFilterer) log.LabelFilterer { |
||||
switch concrete := filter.(type) { |
||||
case *log.BinaryLabelFilter: |
||||
return &log.BinaryLabelFilter{ |
||||
Left: cloneLabelFilterer(concrete.Left), |
||||
Right: cloneLabelFilterer(concrete.Right), |
||||
And: concrete.And, |
||||
} |
||||
case *log.NoopLabelFilter: |
||||
copied := &log.NoopLabelFilter{} |
||||
if concrete.Matcher != nil { |
||||
copied.Matcher = mustNewMatcher(concrete.Type, concrete.Name, concrete.Value) |
||||
} |
||||
|
||||
return copied |
||||
case *log.BytesLabelFilter: |
||||
return &log.BytesLabelFilter{ |
||||
Name: concrete.Name, |
||||
Value: concrete.Value, |
||||
Type: concrete.Type, |
||||
} |
||||
case *log.DurationLabelFilter: |
||||
return &log.DurationLabelFilter{ |
||||
Name: concrete.Name, |
||||
Value: concrete.Value, |
||||
Type: concrete.Type, |
||||
} |
||||
case *log.NumericLabelFilter: |
||||
return &log.NumericLabelFilter{ |
||||
Name: concrete.Name, |
||||
Value: concrete.Value, |
||||
Type: concrete.Type, |
||||
} |
||||
case *log.StringLabelFilter: |
||||
copied := &log.StringLabelFilter{} |
||||
if concrete.Matcher != nil { |
||||
copied.Matcher = mustNewMatcher(concrete.Type, concrete.Name, concrete.Value) |
||||
} |
||||
return copied |
||||
case *log.LineFilterLabelFilter: |
||||
copied := &log.LineFilterLabelFilter{} |
||||
if concrete.Matcher != nil { |
||||
copied.Matcher = mustNewMatcher(concrete.Type, concrete.Name, concrete.Value) |
||||
} |
||||
return copied |
||||
case *log.IPLabelFilter: |
||||
return log.NewIPLabelFilter(concrete.Pattern, concrete.Label, concrete.Ty) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLabelFmt(e *LabelFmtExpr) { |
||||
copied := &LabelFmtExpr{ |
||||
Formats: make([]log.LabelFmt, len(e.Formats)), |
||||
} |
||||
copy(copied.Formats, e.Formats) |
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLabelParser(e *LabelParserExpr) { |
||||
v.cloned = &LabelParserExpr{ |
||||
Op: e.Op, |
||||
Param: e.Param, |
||||
} |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLineFilter(e *LineFilterExpr) { |
||||
copied := &LineFilterExpr{ |
||||
Ty: e.Ty, |
||||
Match: e.Match, |
||||
Op: e.Op, |
||||
IsOrChild: e.IsOrChild, |
||||
} |
||||
|
||||
if e.Left != nil { |
||||
copied.Left = MustClone[*LineFilterExpr](e.Left) |
||||
} |
||||
|
||||
if e.Or != nil { |
||||
copied.Or = MustClone[*LineFilterExpr](e.Or) |
||||
} |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLineFmt(e *LineFmtExpr) { |
||||
v.cloned = &LineFmtExpr{Value: e.Value} |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLogfmtExpressionParser(e *LogfmtExpressionParser) { |
||||
copied := &LogfmtExpressionParser{ |
||||
Expressions: make([]log.LabelExtractionExpr, len(e.Expressions)), |
||||
Strict: e.Strict, |
||||
KeepEmpty: e.KeepEmpty, |
||||
} |
||||
copy(copied.Expressions, e.Expressions) |
||||
|
||||
v.cloned = copied |
||||
} |
||||
|
||||
func (v *cloneVisitor) VisitLogfmtParser(e *LogfmtParserExpr) { |
||||
v.cloned = &LogfmtParserExpr{ |
||||
Strict: e.Strict, |
||||
KeepEmpty: e.KeepEmpty, |
||||
} |
||||
} |
||||
@ -0,0 +1,114 @@ |
||||
package syntax |
||||
|
||||
import ( |
||||
"strings" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/logql/log" |
||||
) |
||||
|
||||
func TestClone(t *testing.T) { |
||||
tests := map[string]struct { |
||||
query string |
||||
}{ |
||||
"simple matchers": { |
||||
query: `{env="prod", app=~"loki.*"}`, |
||||
}, |
||||
"simple aggregation": { |
||||
query: `count_over_time({env="prod", app=~"loki.*"}[5m])`, |
||||
}, |
||||
"simple aggregation with unwrap": { |
||||
query: `sum_over_time({env="prod", app=~"loki.*"} | unwrap bytes[5m])`, |
||||
}, |
||||
"bin op": { |
||||
query: `(count_over_time({env="prod", app=~"loki.*"}[5m]) >= 0)`, |
||||
}, |
||||
"label filter": { |
||||
query: `{app="foo"} |= "bar" | json | ( latency>=250ms or ( status_code<500 , status_code>200 ) )`, |
||||
}, |
||||
"line filter": { |
||||
query: `{app="foo"} |= "bar" | json |= "500" or "200"`, |
||||
}, |
||||
"drop label": { |
||||
query: `{app="foo"} |= "bar" | json | drop latency, status_code="200"`, |
||||
}, |
||||
"keep label": { |
||||
query: `{app="foo"} |= "bar" | json | keep latency, status_code="200"`, |
||||
}, |
||||
"regexp": { |
||||
query: `{env="prod", app=~"loki.*"} |~ ".*foo.*"`, |
||||
}, |
||||
"vector matching": { |
||||
query: `(sum by (cluster)(rate({foo="bar"}[5m])) / ignoring (cluster) count(rate({foo="bar"}[5m])))`, |
||||
}, |
||||
"sum over or vector": { |
||||
query: `(sum(count_over_time({foo="bar"}[5m])) or vector(1.000000))`, |
||||
}, |
||||
"label replace": { |
||||
query: `label_replace(vector(0.000000),"foo","bar","","")`, |
||||
}, |
||||
"filters with bytes": { |
||||
query: `{app="foo"} |= "bar" | json | ( status_code <500 or ( status_code>200 , size>=2.5KiB ) )`, |
||||
}, |
||||
"post filter": { |
||||
query: `quantile_over_time(0.99998,{app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200) |
||||
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo |
||||
| __error__ !~".+"[5m]) by (namespace,instance)`, |
||||
}, |
||||
"multiple post filters": { |
||||
query: `rate({app="foo"} | json | unwrap foo | latency >= 250ms or bytes > 42B or ( status_code < 500 and status_code > 200) or source = ip("") and user = "me" [1m])`, |
||||
}, |
||||
"true filter": { |
||||
query: `{ foo = "bar" } | foo =~".*"`, |
||||
}, |
||||
} |
||||
|
||||
for name, test := range tests { |
||||
t.Run(name, func(t *testing.T) { |
||||
|
||||
expr, err := ParseExpr(test.query) |
||||
require.NoError(t, err) |
||||
|
||||
actual, err := Clone[Expr](expr) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, expr.Pretty(0), actual.Pretty(0)) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestCloneStringLabelFilter(t *testing.T) { |
||||
expr := newPipelineExpr( |
||||
newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), |
||||
MultiStageExpr{ |
||||
newLogfmtParserExpr(nil), |
||||
newLabelFilterExpr(&log.StringLabelFilter{Matcher: labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}), |
||||
}, |
||||
) |
||||
actual, err := Clone[Expr](expr) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, expr.Pretty(0), actual.Pretty(0)) |
||||
} |
||||
|
||||
func TestCloneParseTestCases(t *testing.T) { |
||||
for _, tc := range ParseTestCases { |
||||
if tc.err == nil { |
||||
t.Run(tc.in, func(t *testing.T) { |
||||
ast, err := ParseExpr(tc.in) |
||||
require.NoError(t, err) |
||||
if strings.Contains(tc.in, "KiB") { |
||||
t.Skipf("Byte roundtrip conversion is broken. '%s' vs '%s'", tc.in, ast.String()) |
||||
} |
||||
|
||||
actual, err := Clone[Expr](ast) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, ast.Pretty(0), actual.Pretty(0)) |
||||
}) |
||||
} |
||||
} |
||||
} |
||||
Loading…
Reference in new issue