diff --git a/pkg/logql/quantile_over_time_sketch.go b/pkg/logql/quantile_over_time_sketch.go index 81cd3369e3..a14bf303ab 100644 --- a/pkg/logql/quantile_over_time_sketch.go +++ b/pkg/logql/quantile_over_time_sketch.go @@ -20,8 +20,10 @@ const ( QuantileSketchMatrixType = "QuantileSketchMatrix" ) -type ProbabilisticQuantileVector []ProbabilisticQuantileSample -type ProbabilisticQuantileMatrix []ProbabilisticQuantileVector +type ( + ProbabilisticQuantileVector []ProbabilisticQuantileSample + ProbabilisticQuantileMatrix []ProbabilisticQuantileVector +) var streamHashPool = sync.Pool{ New: func() interface{} { return make(map[uint64]int) }, @@ -177,7 +179,8 @@ func (e *QuantileSketchStepEvaluator) Explain(parent Node) { func newQuantileSketchIterator( it iter.PeekingSampleIterator, - selRange, step, start, end, offset int64) RangeVectorIterator { + selRange, step, start, end, offset int64, +) RangeVectorIterator { inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -343,70 +346,6 @@ func (*QuantileSketchMatrixStepEvaluator) Explain(parent Node) { parent.Child("QuantileSketchMatrix") } -// QuantileSketchMergeStepEvaluator merges multiple quantile sketches into one for each -// step. -type QuantileSketchMergeStepEvaluator struct { - evaluators []StepEvaluator - err error -} - -func NewQuantileSketchMergeStepEvaluator(evaluators []StepEvaluator) *QuantileSketchMergeStepEvaluator { - return &QuantileSketchMergeStepEvaluator{ - evaluators: evaluators, - err: nil, - } -} - -func (e *QuantileSketchMergeStepEvaluator) Next() (bool, int64, StepResult) { - ok, ts, r := e.evaluators[0].Next() - var cur ProbabilisticQuantileVector - if ok { - cur = r.QuantileSketchVec() - } - - if len(e.evaluators) == 1 { - return ok, ts, cur - } - - for _, eval := range e.evaluators[1:] { - ok, nextTs, vec := eval.Next() - if ok { - if cur == nil { - cur = vec.QuantileSketchVec() - } else { - if ts != nextTs { - e.err = fmt.Errorf("timestamps of sketches differ: %d!=%d", ts, nextTs) - return false, 0, nil - } - - _, e.err = cur.Merge(vec.QuantileSketchVec()) - if e.err != nil { - return false, 0, nil - } - } - } - } - - return ok, ts, cur -} - -func (*QuantileSketchMergeStepEvaluator) Close() error { return nil } - -func (e *QuantileSketchMergeStepEvaluator) Error() error { return e.err } - -func (e *QuantileSketchMergeStepEvaluator) Explain(parent Node) { - b := parent.Child("QuantileSketchMerge") - if len(e.evaluators) < MaxChildrenDisplay { - for _, child := range e.evaluators { - child.Explain(b) - } - } else { - e.evaluators[0].Explain(b) - b.Child("...") - e.evaluators[len(e.evaluators)-1].Explain(b) - } -} - // QuantileSketchVectorStepEvaluator evaluates a quantile sketch into a // promql.Vector. type QuantileSketchVectorStepEvaluator struct {