|
|
|
@ -20,8 +20,10 @@ const ( |
|
|
|
|
QuantileSketchMatrixType = "QuantileSketchMatrix" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type ProbabilisticQuantileVector []ProbabilisticQuantileSample |
|
|
|
|
type ProbabilisticQuantileMatrix []ProbabilisticQuantileVector |
|
|
|
|
type ( |
|
|
|
|
ProbabilisticQuantileVector []ProbabilisticQuantileSample |
|
|
|
|
ProbabilisticQuantileMatrix []ProbabilisticQuantileVector |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var streamHashPool = sync.Pool{ |
|
|
|
|
New: func() interface{} { return make(map[uint64]int) }, |
|
|
|
@ -177,7 +179,8 @@ func (e *QuantileSketchStepEvaluator) Explain(parent Node) { |
|
|
|
|
|
|
|
|
|
func newQuantileSketchIterator( |
|
|
|
|
it iter.PeekingSampleIterator, |
|
|
|
|
selRange, step, start, end, offset int64) RangeVectorIterator { |
|
|
|
|
selRange, step, start, end, offset int64, |
|
|
|
|
) RangeVectorIterator { |
|
|
|
|
inner := &batchRangeVectorIterator{ |
|
|
|
|
iter: it, |
|
|
|
|
step: step, |
|
|
|
@ -343,70 +346,6 @@ func (*QuantileSketchMatrixStepEvaluator) Explain(parent Node) { |
|
|
|
|
parent.Child("QuantileSketchMatrix") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// QuantileSketchMergeStepEvaluator merges multiple quantile sketches into one for each
|
|
|
|
|
// step.
|
|
|
|
|
type QuantileSketchMergeStepEvaluator struct { |
|
|
|
|
evaluators []StepEvaluator |
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewQuantileSketchMergeStepEvaluator(evaluators []StepEvaluator) *QuantileSketchMergeStepEvaluator { |
|
|
|
|
return &QuantileSketchMergeStepEvaluator{ |
|
|
|
|
evaluators: evaluators, |
|
|
|
|
err: nil, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *QuantileSketchMergeStepEvaluator) Next() (bool, int64, StepResult) { |
|
|
|
|
ok, ts, r := e.evaluators[0].Next() |
|
|
|
|
var cur ProbabilisticQuantileVector |
|
|
|
|
if ok { |
|
|
|
|
cur = r.QuantileSketchVec() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(e.evaluators) == 1 { |
|
|
|
|
return ok, ts, cur |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, eval := range e.evaluators[1:] { |
|
|
|
|
ok, nextTs, vec := eval.Next() |
|
|
|
|
if ok { |
|
|
|
|
if cur == nil { |
|
|
|
|
cur = vec.QuantileSketchVec() |
|
|
|
|
} else { |
|
|
|
|
if ts != nextTs { |
|
|
|
|
e.err = fmt.Errorf("timestamps of sketches differ: %d!=%d", ts, nextTs) |
|
|
|
|
return false, 0, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
_, e.err = cur.Merge(vec.QuantileSketchVec()) |
|
|
|
|
if e.err != nil { |
|
|
|
|
return false, 0, nil |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return ok, ts, cur |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (*QuantileSketchMergeStepEvaluator) Close() error { return nil } |
|
|
|
|
|
|
|
|
|
func (e *QuantileSketchMergeStepEvaluator) Error() error { return e.err } |
|
|
|
|
|
|
|
|
|
func (e *QuantileSketchMergeStepEvaluator) Explain(parent Node) { |
|
|
|
|
b := parent.Child("QuantileSketchMerge") |
|
|
|
|
if len(e.evaluators) < MaxChildrenDisplay { |
|
|
|
|
for _, child := range e.evaluators { |
|
|
|
|
child.Explain(b) |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
e.evaluators[0].Explain(b) |
|
|
|
|
b.Child("...") |
|
|
|
|
e.evaluators[len(e.evaluators)-1].Explain(b) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// QuantileSketchVectorStepEvaluator evaluates a quantile sketch into a
|
|
|
|
|
// promql.Vector.
|
|
|
|
|
type QuantileSketchVectorStepEvaluator struct { |
|
|
|
|