diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 75021d2c62..d457d8ab25 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -18,6 +18,8 @@ import ( "fmt" "math" "strings" + + "github.com/prometheus/prometheus/util/kahansum" ) // FloatHistogram is similar to Histogram but uses float64 for all @@ -353,7 +355,7 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte } counterResetCollision = h.adjustCounterReset(other) if !h.UsesCustomBuckets() { - otherZeroCount := h.reconcileZeroBuckets(other) + otherZeroCount, _ := h.reconcileZeroBuckets(other, nil) h.ZeroCount += otherZeroCount } h.Count += other.Count @@ -374,11 +376,11 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte intersectedBounds := intersectCustomBucketBounds(h.CustomValues, other.CustomValues) // Add with mapping - maps both histograms to intersected layout. - h.PositiveSpans, h.PositiveBuckets = addCustomBucketsWithMismatches( + h.PositiveSpans, h.PositiveBuckets, _ = addCustomBucketsWithMismatches( false, hPositiveSpans, hPositiveBuckets, h.CustomValues, otherPositiveSpans, otherPositiveBuckets, other.CustomValues, - intersectedBounds) + nil, intersectedBounds) h.CustomValues = intersectedBounds } return h, counterResetCollision, nhcbBoundsReconciled, nil @@ -408,6 +410,121 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte return h, counterResetCollision, nhcbBoundsReconciled, nil } +// KahanAdd works like Add but using the Kahan summation algorithm to minimize numerical errors. +// c is a histogram holding the Kahan compensation term. It is modified in-place if non-nil. +// If c is nil, a new compensation histogram is created inside the function. In this case, +// the caller must use the returned updatedC, because the original c variable is not modified. +func (h *FloatHistogram) KahanAdd(other, c *FloatHistogram) (updatedC *FloatHistogram, counterResetCollision, nhcbBoundsReconciled bool, err error) { + if err := h.checkSchemaAndBounds(other); err != nil { + return nil, false, false, err + } + + counterResetCollision = h.adjustCounterReset(other) + + if c == nil { + c = h.newCompensationHistogram() + } + if !h.UsesCustomBuckets() { + otherZeroCount, otherCZeroCount := h.reconcileZeroBuckets(other, c) + h.ZeroCount, c.ZeroCount = kahansum.Inc(otherZeroCount, h.ZeroCount, c.ZeroCount) + h.ZeroCount, c.ZeroCount = kahansum.Inc(otherCZeroCount, h.ZeroCount, c.ZeroCount) + } + h.Count, c.Count = kahansum.Inc(other.Count, h.Count, c.Count) + h.Sum, c.Sum = kahansum.Inc(other.Sum, h.Sum, c.Sum) + + var ( + hPositiveSpans = h.PositiveSpans + hPositiveBuckets = h.PositiveBuckets + otherPositiveSpans = other.PositiveSpans + otherPositiveBuckets = other.PositiveBuckets + cPositiveBuckets = c.PositiveBuckets + ) + + if h.UsesCustomBuckets() { + if CustomBucketBoundsMatch(h.CustomValues, other.CustomValues) { + h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, false, + hPositiveSpans, hPositiveBuckets, + otherPositiveSpans, otherPositiveBuckets, + cPositiveBuckets, nil, + ) + } else { + nhcbBoundsReconciled = true + intersectedBounds := intersectCustomBucketBounds(h.CustomValues, other.CustomValues) + + // Add with mapping - maps both histograms to intersected layout. + h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = addCustomBucketsWithMismatches( + false, + hPositiveSpans, hPositiveBuckets, h.CustomValues, + otherPositiveSpans, otherPositiveBuckets, other.CustomValues, + cPositiveBuckets, intersectedBounds) + h.CustomValues = intersectedBounds + c.CustomValues = intersectedBounds + } + c.PositiveSpans = h.PositiveSpans + return c, counterResetCollision, nhcbBoundsReconciled, nil + } + + otherC := other.newCompensationHistogram() + + var ( + hNegativeSpans = h.NegativeSpans + hNegativeBuckets = h.NegativeBuckets + otherNegativeSpans = other.NegativeSpans + otherNegativeBuckets = other.NegativeBuckets + cNegativeBuckets = c.NegativeBuckets + otherCPositiveBuckets = otherC.PositiveBuckets + otherCNegativeBuckets = otherC.NegativeBuckets + ) + + switch { + case other.Schema < h.Schema: + hPositiveSpans, hPositiveBuckets, cPositiveBuckets = kahanReduceResolution( + hPositiveSpans, hPositiveBuckets, cPositiveBuckets, + h.Schema, other.Schema, + true, + ) + hNegativeSpans, hNegativeBuckets, cNegativeBuckets = kahanReduceResolution( + hNegativeSpans, hNegativeBuckets, cNegativeBuckets, + h.Schema, other.Schema, + true, + ) + h.Schema = other.Schema + + case other.Schema > h.Schema: + otherPositiveSpans, otherPositiveBuckets, otherCPositiveBuckets = kahanReduceResolution( + otherPositiveSpans, otherPositiveBuckets, otherCPositiveBuckets, + other.Schema, h.Schema, + false, + ) + otherNegativeSpans, otherNegativeBuckets, otherCNegativeBuckets = kahanReduceResolution( + otherNegativeSpans, otherNegativeBuckets, otherCNegativeBuckets, + other.Schema, h.Schema, + false, + ) + } + + h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, false, + hPositiveSpans, hPositiveBuckets, + otherPositiveSpans, otherPositiveBuckets, + cPositiveBuckets, otherCPositiveBuckets, + ) + h.NegativeSpans, h.NegativeBuckets, c.NegativeBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, false, + hNegativeSpans, hNegativeBuckets, + otherNegativeSpans, otherNegativeBuckets, + cNegativeBuckets, otherCNegativeBuckets, + ) + + c.Schema = h.Schema + c.ZeroThreshold = h.ZeroThreshold + c.PositiveSpans = h.PositiveSpans + c.NegativeSpans = h.NegativeSpans + + return c, counterResetCollision, nhcbBoundsReconciled, nil +} + // Sub works like Add but subtracts the other histogram. It uses the same logic // to adjust the counter reset hint. This is useful where this method is used // for incremental mean calculation. However, if it is used for the actual "-" @@ -419,7 +536,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte } counterResetCollision = h.adjustCounterReset(other) if !h.UsesCustomBuckets() { - otherZeroCount := h.reconcileZeroBuckets(other) + otherZeroCount, _ := h.reconcileZeroBuckets(other, nil) h.ZeroCount -= otherZeroCount } h.Count -= other.Count @@ -440,11 +557,11 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte intersectedBounds := intersectCustomBucketBounds(h.CustomValues, other.CustomValues) // Subtract with mapping - maps both histograms to intersected layout. - h.PositiveSpans, h.PositiveBuckets = addCustomBucketsWithMismatches( + h.PositiveSpans, h.PositiveBuckets, _ = addCustomBucketsWithMismatches( true, hPositiveSpans, hPositiveBuckets, h.CustomValues, otherPositiveSpans, otherPositiveBuckets, other.CustomValues, - intersectedBounds) + nil, intersectedBounds) h.CustomValues = intersectedBounds } return h, counterResetCollision, nhcbBoundsReconciled, nil @@ -576,15 +693,28 @@ func (h *FloatHistogram) Size() int { // easier to iterate through. Still, the safest bet is to use maxEmptyBuckets==0 // and only use a larger number if you know what you are doing. func (h *FloatHistogram) Compact(maxEmptyBuckets int) *FloatHistogram { - h.PositiveBuckets, h.PositiveSpans = compactBuckets( - h.PositiveBuckets, h.PositiveSpans, maxEmptyBuckets, false, + h.PositiveBuckets, _, h.PositiveSpans = compactBuckets( + h.PositiveBuckets, nil, h.PositiveSpans, maxEmptyBuckets, false, ) - h.NegativeBuckets, h.NegativeSpans = compactBuckets( - h.NegativeBuckets, h.NegativeSpans, maxEmptyBuckets, false, + h.NegativeBuckets, _, h.NegativeSpans = compactBuckets( + h.NegativeBuckets, nil, h.NegativeSpans, maxEmptyBuckets, false, ) return h } +// kahanCompact works like Compact, but it is specialized for FloatHistogram's KahanAdd method. +// c is a histogram holding the Kahan compensation term. +func (h *FloatHistogram) kahanCompact(maxEmptyBuckets int, c *FloatHistogram, +) (updatedH, updatedC *FloatHistogram) { + h.PositiveBuckets, c.PositiveBuckets, h.PositiveSpans = compactBuckets( + h.PositiveBuckets, c.PositiveBuckets, h.PositiveSpans, maxEmptyBuckets, false, + ) + h.NegativeBuckets, c.NegativeBuckets, h.NegativeSpans = compactBuckets( + h.NegativeBuckets, c.NegativeBuckets, h.NegativeSpans, maxEmptyBuckets, false, + ) + return h, c +} + // DetectReset returns true if the receiving histogram is missing any buckets // that have a non-zero population in the provided previous histogram. It also // returns true if any count (in any bucket, in the zero count, or in the count @@ -652,7 +782,7 @@ func (h *FloatHistogram) DetectReset(previous *FloatHistogram) bool { // ZeroThreshold decreased. return true } - previousZeroCount, newThreshold := previous.zeroCountForLargerThreshold(h.ZeroThreshold) + previousZeroCount, newThreshold, _ := previous.zeroCountForLargerThreshold(h.ZeroThreshold, nil) if newThreshold != h.ZeroThreshold { // ZeroThreshold is within a populated bucket in previous // histogram. @@ -847,30 +977,42 @@ func (h *FloatHistogram) Validate() error { } // zeroCountForLargerThreshold returns what the histogram's zero count would be -// if the ZeroThreshold had the provided larger (or equal) value. If the -// provided value is less than the histogram's ZeroThreshold, the method panics. +// if the ZeroThreshold had the provided larger (or equal) value. It also returns the +// zero count of the compensation histogram `c` if provided (used for Kahan summation). +// +// If the provided ZeroThreshold is less than the histogram's ZeroThreshold, the method panics. // If the largerThreshold ends up within a populated bucket of the histogram, it // is adjusted upwards to the lower limit of that bucket (all in terms of // absolute values) and that bucket's count is included in the returned // count. The adjusted threshold is returned, too. -func (h *FloatHistogram) zeroCountForLargerThreshold(largerThreshold float64) (count, threshold float64) { +func (h *FloatHistogram) zeroCountForLargerThreshold( + largerThreshold float64, c *FloatHistogram) (hZeroCount, threshold, cZeroCount float64, +) { + if c != nil { + cZeroCount = c.ZeroCount + } // Fast path. if largerThreshold == h.ZeroThreshold { - return h.ZeroCount, largerThreshold + return h.ZeroCount, largerThreshold, cZeroCount } if largerThreshold < h.ZeroThreshold { panic(fmt.Errorf("new threshold %f is less than old threshold %f", largerThreshold, h.ZeroThreshold)) } outer: for { - count = h.ZeroCount + hZeroCount = h.ZeroCount i := h.PositiveBucketIterator() + bucketsIdx := 0 for i.Next() { b := i.At() if b.Lower >= largerThreshold { break } - count += b.Count // Bucket to be merged into zero bucket. + // Bucket to be merged into zero bucket. + hZeroCount, cZeroCount = kahansum.Inc(b.Count, hZeroCount, cZeroCount) + if c != nil { + hZeroCount, cZeroCount = kahansum.Inc(c.PositiveBuckets[bucketsIdx], hZeroCount, cZeroCount) + } if b.Upper > largerThreshold { // New threshold ended up within a bucket. if it's // populated, we need to adjust largerThreshold before @@ -880,14 +1022,20 @@ outer: } break } + bucketsIdx++ } i = h.NegativeBucketIterator() + bucketsIdx = 0 for i.Next() { b := i.At() if b.Upper <= -largerThreshold { break } - count += b.Count // Bucket to be merged into zero bucket. + // Bucket to be merged into zero bucket. + hZeroCount, cZeroCount = kahansum.Inc(b.Count, hZeroCount, cZeroCount) + if c != nil { + hZeroCount, cZeroCount = kahansum.Inc(c.NegativeBuckets[bucketsIdx], hZeroCount, cZeroCount) + } if b.Lower < -largerThreshold { // New threshold ended up within a bucket. If // it's populated, we need to adjust @@ -900,15 +1048,17 @@ outer: } break } + bucketsIdx++ } - return count, largerThreshold + return hZeroCount, largerThreshold, cZeroCount } } // trimBucketsInZeroBucket removes all buckets that are within the zero // bucket. It assumes that the zero threshold is at a bucket boundary and that // the counts in the buckets to remove are already part of the zero count. -func (h *FloatHistogram) trimBucketsInZeroBucket() { +// c is a histogram holding the Kahan compensation term. +func (h *FloatHistogram) trimBucketsInZeroBucket(c *FloatHistogram) { i := h.PositiveBucketIterator() bucketsIdx := 0 for i.Next() { @@ -917,6 +1067,9 @@ func (h *FloatHistogram) trimBucketsInZeroBucket() { break } h.PositiveBuckets[bucketsIdx] = 0 + if c != nil { + c.PositiveBuckets[bucketsIdx] = 0 + } bucketsIdx++ } i = h.NegativeBucketIterator() @@ -927,34 +1080,46 @@ func (h *FloatHistogram) trimBucketsInZeroBucket() { break } h.NegativeBuckets[bucketsIdx] = 0 + if c != nil { + c.NegativeBuckets[bucketsIdx] = 0 + } bucketsIdx++ } // We are abusing Compact to trim the buckets set to zero // above. Premature compacting could cause additional cost, but this // code path is probably rarely used anyway. - h.Compact(0) + if c != nil { + h.kahanCompact(0, c) + } else { + h.Compact(0) + } } // reconcileZeroBuckets finds a zero bucket large enough to include the zero // buckets of both histograms (the receiving histogram and the other histogram) // with a zero threshold that is not within a populated bucket in either -// histogram. This method modifies the receiving histogram accordingly, but -// leaves the other histogram as is. Instead, it returns the zero count the -// other histogram would have if it were modified. -func (h *FloatHistogram) reconcileZeroBuckets(other *FloatHistogram) float64 { - otherZeroCount := other.ZeroCount +// histogram. This method modifies the receiving histogram accordingly, and +// also modifies the compensation histogram `c` (used for Kahan summation) if provided, +// but leaves the other histogram as is. Instead, it returns the zero count the +// other histogram would have if it were modified, as well as its Kahan compensation term. +func (h *FloatHistogram) reconcileZeroBuckets(other, c *FloatHistogram) (otherZeroCount, otherCZeroCount float64) { + otherZeroCount = other.ZeroCount otherZeroThreshold := other.ZeroThreshold for otherZeroThreshold != h.ZeroThreshold { if h.ZeroThreshold > otherZeroThreshold { - otherZeroCount, otherZeroThreshold = other.zeroCountForLargerThreshold(h.ZeroThreshold) + otherZeroCount, otherZeroThreshold, otherCZeroCount = other.zeroCountForLargerThreshold(h.ZeroThreshold, nil) } if otherZeroThreshold > h.ZeroThreshold { - h.ZeroCount, h.ZeroThreshold = h.zeroCountForLargerThreshold(otherZeroThreshold) - h.trimBucketsInZeroBucket() + var cZeroCount float64 + h.ZeroCount, h.ZeroThreshold, cZeroCount = h.zeroCountForLargerThreshold(otherZeroThreshold, c) + if c != nil { + c.ZeroCount = cZeroCount + } + h.trimBucketsInZeroBucket(c) } } - return otherZeroCount + return otherZeroCount, otherCZeroCount } // floatBucketIterator is a low-level constructor for bucket iterators. @@ -1369,6 +1534,145 @@ func addBuckets( return spansA, bucketsA } +// kahanAddBuckets works like addBuckets but it is used in FloatHistogram's KahanAdd method +// and takes additional arguments, compensationBucketsA and compensationBucketsB, +// which hold the Kahan compensation values associated with histograms A and B. +// It returns the resulting spans/buckets and compensation buckets. +func kahanAddBuckets( + schema int32, threshold float64, negative bool, + spansA []Span, bucketsA []float64, + spansB []Span, bucketsB []float64, + compensationBucketsA, compensationBucketsB []float64, +) (newSpans []Span, newBucketsA, newBucketsC []float64) { + var ( + iSpan = -1 + iBucket = -1 + iInSpan int32 + indexA int32 + indexB int32 + bIdxB int + bucketB float64 + compensationBucketB float64 + deltaIndex int32 + lowerThanThreshold = true + ) + + for _, spanB := range spansB { + indexB += spanB.Offset + for j := 0; j < int(spanB.Length); j++ { + if lowerThanThreshold && IsExponentialSchema(schema) && getBoundExponential(indexB, schema) <= threshold { + goto nextLoop + } + lowerThanThreshold = false + + bucketB = bucketsB[bIdxB] + if compensationBucketsB != nil { + compensationBucketB = compensationBucketsB[bIdxB] + } + if negative { + bucketB *= -1 + compensationBucketB *= -1 + } + + if iSpan == -1 { + if len(spansA) == 0 || spansA[0].Offset > indexB { + // Add bucket before all others. + bucketsA = append(bucketsA, 0) + copy(bucketsA[1:], bucketsA) + bucketsA[0] = bucketB + compensationBucketsA = append(compensationBucketsA, 0) + copy(compensationBucketsA[1:], compensationBucketsA) + compensationBucketsA[0] = compensationBucketB + if len(spansA) > 0 && spansA[0].Offset == indexB+1 { + spansA[0].Length++ + spansA[0].Offset-- + goto nextLoop + } + spansA = append(spansA, Span{}) + copy(spansA[1:], spansA) + spansA[0] = Span{Offset: indexB, Length: 1} + if len(spansA) > 1 { + // Convert the absolute offset in the formerly + // first span to a relative offset. + spansA[1].Offset -= indexB + 1 + } + goto nextLoop + } else if spansA[0].Offset == indexB { + // Just add to first bucket. + bucketsA[0], compensationBucketsA[0] = kahansum.Inc(bucketB, bucketsA[0], compensationBucketsA[0]) + bucketsA[0], compensationBucketsA[0] = kahansum.Inc(compensationBucketB, bucketsA[0], compensationBucketsA[0]) + goto nextLoop + } + iSpan, iBucket, iInSpan = 0, 0, 0 + indexA = spansA[0].Offset + } + deltaIndex = indexB - indexA + for { + remainingInSpan := int32(spansA[iSpan].Length) - iInSpan + if deltaIndex < remainingInSpan { + // Bucket is in current span. + iBucket += int(deltaIndex) + iInSpan += deltaIndex + bucketsA[iBucket], compensationBucketsA[iBucket] = kahansum.Inc(bucketB, bucketsA[iBucket], compensationBucketsA[iBucket]) + bucketsA[iBucket], compensationBucketsA[iBucket] = kahansum.Inc(compensationBucketB, bucketsA[iBucket], compensationBucketsA[iBucket]) + break + } + deltaIndex -= remainingInSpan + iBucket += int(remainingInSpan) + iSpan++ + if iSpan == len(spansA) || deltaIndex < spansA[iSpan].Offset { + // Bucket is in gap behind previous span (or there are no further spans). + bucketsA = append(bucketsA, 0) + copy(bucketsA[iBucket+1:], bucketsA[iBucket:]) + bucketsA[iBucket] = bucketB + compensationBucketsA = append(compensationBucketsA, 0) + copy(compensationBucketsA[iBucket+1:], compensationBucketsA[iBucket:]) + compensationBucketsA[iBucket] = compensationBucketB + switch { + case deltaIndex == 0: + // Directly after previous span, extend previous span. + if iSpan < len(spansA) { + spansA[iSpan].Offset-- + } + iSpan-- + iInSpan = int32(spansA[iSpan].Length) + spansA[iSpan].Length++ + goto nextLoop + case iSpan < len(spansA) && deltaIndex == spansA[iSpan].Offset-1: + // Directly before next span, extend next span. + iInSpan = 0 + spansA[iSpan].Offset-- + spansA[iSpan].Length++ + goto nextLoop + default: + // No next span, or next span is not directly adjacent to new bucket. + // Add new span. + iInSpan = 0 + if iSpan < len(spansA) { + spansA[iSpan].Offset -= deltaIndex + 1 + } + spansA = append(spansA, Span{}) + copy(spansA[iSpan+1:], spansA[iSpan:]) + spansA[iSpan] = Span{Length: 1, Offset: deltaIndex} + goto nextLoop + } + } else { + // Try start of next span. + deltaIndex -= spansA[iSpan].Offset + iInSpan = 0 + } + } + + nextLoop: + indexA = indexB + indexB++ + bIdxB++ + } + } + + return spansA, bucketsA, compensationBucketsA +} + // floatBucketsMatch compares bucket values of two float histograms using binary float comparison // and returns true if all values match. func floatBucketsMatch(b1, b2 []float64) bool { @@ -1496,15 +1800,18 @@ func intersectCustomBucketBounds(boundsA, boundsB []float64) []float64 { // addCustomBucketsWithMismatches handles adding/subtracting custom bucket histograms // with mismatched bucket layouts by mapping both to an intersected layout. +// It also processes the Kahan compensation term if provided. func addCustomBucketsWithMismatches( negative bool, spansA []Span, bucketsA, boundsA []float64, spansB []Span, bucketsB, boundsB []float64, + bucketsC []float64, intersectedBounds []float64, -) ([]Span, []float64) { +) ([]Span, []float64, []float64) { targetBuckets := make([]float64, len(intersectedBounds)+1) + cTargetBuckets := make([]float64, len(intersectedBounds)+1) - mapBuckets := func(spans []Span, buckets, bounds []float64, negative bool) { + mapBuckets := func(spans []Span, buckets, bounds []float64, negative, withCompensation bool) { srcIdx := 0 bucketIdx := 0 intersectIdx := 0 @@ -1530,9 +1837,12 @@ func addCustomBucketsWithMismatches( } if negative { - targetBuckets[targetIdx] -= value + targetBuckets[targetIdx], cTargetBuckets[targetIdx] = kahansum.Dec(value, targetBuckets[targetIdx], cTargetBuckets[targetIdx]) } else { - targetBuckets[targetIdx] += value + targetBuckets[targetIdx], cTargetBuckets[targetIdx] = kahansum.Inc(value, targetBuckets[targetIdx], cTargetBuckets[targetIdx]) + if withCompensation && bucketsC != nil { + targetBuckets[targetIdx], cTargetBuckets[targetIdx] = kahansum.Inc(bucketsC[bucketIdx], targetBuckets[targetIdx], cTargetBuckets[targetIdx]) + } } } srcIdx++ @@ -1541,21 +1851,23 @@ func addCustomBucketsWithMismatches( } } - // Map both histograms to the intersected layout. - mapBuckets(spansA, bucketsA, boundsA, false) - mapBuckets(spansB, bucketsB, boundsB, negative) + // Map histograms to the intersected layout. + mapBuckets(spansA, bucketsA, boundsA, false, true) + mapBuckets(spansB, bucketsB, boundsB, negative, false) // Build spans and buckets, excluding zero-valued buckets from the final result. - destSpans := spansA[:0] // Reuse spansA capacity for destSpans since we don't need it anymore. - destBuckets := targetBuckets[:0] // Reuse targetBuckets capacity for destBuckets since it's guaranteed to be large enough. + destSpans := spansA[:0] // Reuse spansA capacity for destSpans since we don't need it anymore. + destBuckets := targetBuckets[:0] // Reuse targetBuckets capacity for destBuckets since it's guaranteed to be large enough. + cDestBuckets := cTargetBuckets[:0] // Reuse cTargetBuckets capacity for cDestBuckets since it's guaranteed to be large enough. lastIdx := int32(-1) - for i, count := range targetBuckets { - if count == 0 { + for i := range targetBuckets { + if targetBuckets[i] == 0 && cTargetBuckets[i] == 0 { continue } - destBuckets = append(destBuckets, count) + destBuckets = append(destBuckets, targetBuckets[i]) + cDestBuckets = append(cDestBuckets, cTargetBuckets[i]) idx := int32(i) if len(destSpans) > 0 && idx == lastIdx+1 { @@ -1578,7 +1890,7 @@ func addCustomBucketsWithMismatches( lastIdx = idx } - return destSpans, destBuckets + return destSpans, destBuckets, cDestBuckets } // ReduceResolution reduces the float histogram's spans, buckets into target schema. @@ -1618,6 +1930,121 @@ func (h *FloatHistogram) ReduceResolution(targetSchema int32) error { return nil } +// kahanReduceResolution works like reduceResolution, but it is specialized for FloatHistogram's KahanAdd method. +// Unlike reduceResolution, which supports both float and integer buckets, this function only operates on float buckets. +// It also takes an additional argument, originCompensationBuckets, representing the compensation buckets for the origin histogram. +// Modifies both the origin histogram buckets and their associated compensation buckets. +func kahanReduceResolution( + originSpans []Span, + originReceivingBuckets []float64, + originCompensationBuckets []float64, + originSchema, + targetSchema int32, + inplace bool, +) (newSpans []Span, newReceivingBuckets, newCompensationBuckets []float64) { + var ( + targetSpans []Span // The spans in the target schema. + targetReceivingBuckets []float64 // The receiving bucket counts in the target schema. + targetCompensationBuckets []float64 // The compensation bucket counts in the target schema. + bucketIdx int32 // The index of bucket in the origin schema. + bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`. + targetBucketIdx int32 // The index of bucket in the target schema. + lastTargetBucketIdx int32 // The index of the last added target bucket. + ) + + if inplace { + // Slice reuse is safe because when reducing the resolution, + // target slices don't grow faster than origin slices are being read. + targetSpans = originSpans[:0] + targetReceivingBuckets = originReceivingBuckets[:0] + targetCompensationBuckets = originCompensationBuckets[:0] + } + + for _, span := range originSpans { + // Determine the index of the first bucket in this span. + bucketIdx += span.Offset + for j := 0; j < int(span.Length); j++ { + // Determine the index of the bucket in the target schema from the index in the original schema. + targetBucketIdx = targetIdx(bucketIdx, originSchema, targetSchema) + + switch { + case len(targetSpans) == 0: + // This is the first span in the targetSpans. + span := Span{ + Offset: targetBucketIdx, + Length: 1, + } + targetSpans = append(targetSpans, span) + targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx]) + lastTargetBucketIdx = targetBucketIdx + targetCompensationBuckets = append(targetCompensationBuckets, originCompensationBuckets[bucketCountIdx]) + + case lastTargetBucketIdx == targetBucketIdx: + // The current bucket has to be merged into the same target bucket as the previous bucket. + lastBucketIdx := len(targetReceivingBuckets) - 1 + targetReceivingBuckets[lastBucketIdx], targetCompensationBuckets[lastBucketIdx] = kahansum.Inc( + originReceivingBuckets[bucketCountIdx], + targetReceivingBuckets[lastBucketIdx], + targetCompensationBuckets[lastBucketIdx], + ) + targetReceivingBuckets[lastBucketIdx], targetCompensationBuckets[lastBucketIdx] = kahansum.Inc( + originCompensationBuckets[bucketCountIdx], + targetReceivingBuckets[lastBucketIdx], + targetCompensationBuckets[lastBucketIdx], + ) + + case (lastTargetBucketIdx + 1) == targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is next to the previous target bucket, + // so we add it to the current target span. + targetSpans[len(targetSpans)-1].Length++ + lastTargetBucketIdx++ + targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx]) + targetCompensationBuckets = append(targetCompensationBuckets, originCompensationBuckets[bucketCountIdx]) + + case (lastTargetBucketIdx + 1) < targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is separated by a gap from the previous target bucket, + // so we need to add a new target span. + span := Span{ + Offset: targetBucketIdx - lastTargetBucketIdx - 1, + Length: 1, + } + targetSpans = append(targetSpans, span) + lastTargetBucketIdx = targetBucketIdx + targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx]) + targetCompensationBuckets = append(targetCompensationBuckets, originCompensationBuckets[bucketCountIdx]) + } + + bucketIdx++ + bucketCountIdx++ + } + } + + return targetSpans, targetReceivingBuckets, targetCompensationBuckets +} + +// newCompensationHistogram initializes a new compensation histogram that can be used +// alongside the current FloatHistogram in Kahan summation. +// The compensation histogram is structured to match the receiving histogram's bucket layout +// including its schema, zero threshold and custom values, and it shares spans with the receiving +// histogram. However, the bucket values in the compensation histogram are initialized to zero. +func (h *FloatHistogram) newCompensationHistogram() *FloatHistogram { + c := &FloatHistogram{ + CounterResetHint: h.CounterResetHint, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + CustomValues: h.CustomValues, + PositiveBuckets: make([]float64, len(h.PositiveBuckets)), + PositiveSpans: h.PositiveSpans, + NegativeSpans: h.NegativeSpans, + } + if !h.UsesCustomBuckets() { + c.NegativeBuckets = make([]float64, len(h.NegativeBuckets)) + } + return c +} + // checkSchemaAndBounds checks if two histograms are compatible because they // both use a standard exponential schema or because they both are NHCBs. func (h *FloatHistogram) checkSchemaAndBounds(other *FloatHistogram) error { @@ -1659,3 +2086,27 @@ func (h *FloatHistogram) adjustCounterReset(other *FloatHistogram) (counterReset } return false } + +// HasOverflow reports whether any of the FloatHistogram's fields contain an infinite value. +// This can happen when aggregating multiple histograms and exceeding float64 capacity. +func (h *FloatHistogram) HasOverflow() bool { + if math.IsInf(h.ZeroCount, 0) || math.IsInf(h.Count, 0) || math.IsInf(h.Sum, 0) { + return true + } + for _, v := range h.PositiveBuckets { + if math.IsInf(v, 0) { + return true + } + } + for _, v := range h.NegativeBuckets { + if math.IsInf(v, 0) { + return true + } + } + for _, v := range h.CustomValues { + if math.IsInf(v, 0) { + return true + } + } + return false +} diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index 5c29544c8f..caf77b6256 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -2514,6 +2514,243 @@ func TestFloatHistogramAdd(t *testing.T) { t.Run(c.name, func(t *testing.T) { testHistogramAdd(t, c.in1, c.in2, c.expected, c.expErrMsg, c.expCounterResetCollision, c.expNHCBBoundsReconciled) testHistogramAdd(t, c.in2, c.in1, c.expected, c.expErrMsg, c.expCounterResetCollision, c.expNHCBBoundsReconciled) + testHistogramKahanAdd(t, c.in1, nil, c.in2, c.expected, c.expErrMsg, c.expCounterResetCollision, c.expNHCBBoundsReconciled) + testHistogramKahanAdd(t, c.in2, nil, c.in1, c.expected, c.expErrMsg, c.expCounterResetCollision, c.expNHCBBoundsReconciled) + }) + } +} + +// TestKahanAddWithCompHistogram tests KahanAdd. +// Test cases provide two float histograms and a compensation histogram with predefined values. +func TestKahanAddWithCompHistogram(t *testing.T) { + cases := []struct { + name string + in1, comp, in2, expectedSum *FloatHistogram + expErrMsg string + expCounterResetCollision bool + expNHCBBoundsReconciled bool + }{ + { + name: "larger zero bucket in first histogram", + in1: &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + comp: &FloatHistogram{ + ZeroThreshold: 1, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{0.02, 0.03, 0.06, 0.02, 0.05}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{0.01, 0.01, 0.04, 0.04}, + }, + in2: &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + expectedSum: &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 29, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2.02, 6.03, 10.06, 9.02, 5.05}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{3, 2.01, 1.01, 4.04, 9.04, 6}, + }, + expErrMsg: "", + expCounterResetCollision: false, + expNHCBBoundsReconciled: false, + }, + { + name: "smaller zero bucket in first histogram", + in1: &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 40, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 2, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + comp: &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 0, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{0.02, 0.03, 0.06, 0.07, 0.05}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{0.01, 0.01, 0.04, 0.04}, + }, + in2: &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 17, + Count: 11, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + expectedSum: &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 31.05, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 5}}, + PositiveBuckets: []float64{2, 6.06, 10.07, 9.05, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{3.01, 2.01, 1, 4, 9.04, 6.04}, + }, + expErrMsg: "", + expCounterResetCollision: false, + expNHCBBoundsReconciled: false, + }, + { + name: "first histogram contains zero buckets and Compact is called", + in1: &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {1, 1}, {1, 3}}, + PositiveBuckets: []float64{1, 3, 3, 0, 7, -6}, + }, + comp: &FloatHistogram{ + ZeroThreshold: 0.01, + PositiveSpans: []Span{{-2, 2}, {1, 1}, {1, 3}}, + PositiveBuckets: []float64{7, 2, 0.03, 0, 0.05, 0.06}, + }, + in2: &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {1, 2}}, + PositiveBuckets: []float64{2, 3, 2, 5}, + }, + expectedSum: &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 41, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 2}, {1, 2}}, + PositiveBuckets: []float64{5.03, 3, 9.05, -0.94}, + }, + expErrMsg: "", + expCounterResetCollision: false, + expNHCBBoundsReconciled: false, + }, + { + name: "reduce resolution", + in1: &FloatHistogram{ + Schema: 2, + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {1, 1}, {1, 3}}, + PositiveBuckets: []float64{1, 3, 1e100, 0, 7, -6}, + }, + comp: &FloatHistogram{ + Schema: 2, + ZeroThreshold: 0.01, + ZeroCount: 1, + PositiveSpans: []Span{{-2, 2}, {1, 1}, {1, 3}}, + PositiveBuckets: []float64{7, 2, 0.03, 0, 0.05, 0.06}, + }, + in2: &FloatHistogram{ + Schema: 1, + ZeroThreshold: 1, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {1, 2}}, + PositiveBuckets: []float64{-1e100, 3, 2, 5}, + }, + expectedSum: &FloatHistogram{ + Schema: 1, + ZeroThreshold: 1, + ZeroCount: 42, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 5}}, + PositiveBuckets: []float64{0.03, 10.05, -5.94, 2, 5}, + }, + expErrMsg: "", + expCounterResetCollision: false, + expNHCBBoundsReconciled: false, + }, + { + name: "reduce resolution of 'other' histogram", + in1: &FloatHistogram{ + Schema: 0, + ZeroThreshold: 1, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {1, 2}}, + PositiveBuckets: []float64{2, 3, 2, 5}, + }, + comp: &FloatHistogram{ + Schema: 0, + ZeroThreshold: 1, + ZeroCount: 1, + PositiveSpans: []Span{{1, 2}, {1, 2}}, + PositiveBuckets: []float64{17, 2, 0.03, 0}, + }, + in2: &FloatHistogram{ + Schema: 2, + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 3}, {1, 1}, {1, 3}}, + PositiveBuckets: []float64{1e100, 4.1, -1e100, 2.1, 0, 7, -6}, + }, + expectedSum: &FloatHistogram{ + Schema: 0, + ZeroThreshold: 1, + ZeroCount: 33.1, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 2}, {1, 2}}, + PositiveBuckets: []float64{21.1, 6, 2.03, 5}, + }, + expErrMsg: "", + expCounterResetCollision: false, + expNHCBBoundsReconciled: false, + }, + { + name: "warn on counter reset hint collision", + in1: &FloatHistogram{ + Schema: CustomBucketsSchema, + CounterResetHint: CounterReset, + }, + in2: &FloatHistogram{ + Schema: CustomBucketsSchema, + CounterResetHint: NotCounterReset, + }, + expErrMsg: "", + expCounterResetCollision: true, + expNHCBBoundsReconciled: false, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + testHistogramKahanAdd(t, c.in1, c.comp, c.in2, c.expectedSum, c.expErrMsg, c.expCounterResetCollision, c.expNHCBBoundsReconciled) }) } } @@ -2557,6 +2794,68 @@ func testHistogramAdd(t *testing.T, a, b, expected *FloatHistogram, expErrMsg st } } +func testHistogramKahanAdd( + t *testing.T, a, c, b, expectedSum *FloatHistogram, expErrMsg string, expCounterResetCollision, expNHCBBoundsReconciled bool, +) { + var ( + aCopy = a.Copy() + bCopy = b.Copy() + cCopy *FloatHistogram + expectedSumCopy *FloatHistogram + ) + + if c != nil { + cCopy = c.Copy() + } + + if expectedSum != nil { + expectedSumCopy = expectedSum.Copy() + } + + comp, counterResetCollision, nhcbBoundsReconciled, err := aCopy.KahanAdd(bCopy, cCopy) + if expErrMsg != "" { + require.EqualError(t, err, expErrMsg) + } else { + require.NoError(t, err) + } + + var res *FloatHistogram + if comp != nil { + // Check that aCopy and its compensation histogram layouts match after addition. + require.Equal(t, aCopy.Schema, comp.Schema) + require.Equal(t, aCopy.ZeroThreshold, comp.ZeroThreshold) + require.Equal(t, aCopy.PositiveSpans, comp.PositiveSpans) + require.Equal(t, aCopy.NegativeSpans, comp.NegativeSpans) + require.Len(t, aCopy.CustomValues, len(comp.CustomValues)) + require.Len(t, aCopy.PositiveBuckets, len(comp.PositiveBuckets)) + require.Len(t, aCopy.NegativeBuckets, len(comp.NegativeBuckets)) + + res, _, _, err = aCopy.Add(comp) + if expErrMsg != "" { + require.EqualError(t, err, expErrMsg) + } else { + require.NoError(t, err) + } + } + + // Check that the warnings are correct. + require.Equal(t, expCounterResetCollision, counterResetCollision) + require.Equal(t, expNHCBBoundsReconciled, nhcbBoundsReconciled) + + if expectedSum != nil { + res.Compact(0) + expectedSumCopy.Compact(0) + + require.Equal(t, expectedSumCopy, res) + + // Has it also happened in-place? + require.Equal(t, expectedSumCopy, aCopy) + + // Check that the argument was not mutated. + require.Equal(t, b, bCopy) + } +} + func TestFloatHistogramSub(t *testing.T) { // This has fewer test cases than TestFloatHistogramAdd because Add and // Sub share most of the trickier code. diff --git a/model/histogram/generic.go b/model/histogram/generic.go index 61fc5067f2..9ec9e9cd4b 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -230,14 +230,29 @@ func (b *baseBucketIterator[BC, IBC]) strippedAt() strippedBucket[BC] { // compactBuckets is a generic function used by both Histogram.Compact and // FloatHistogram.Compact. Set deltaBuckets to true if the provided buckets are // deltas. Set it to false if the buckets contain absolute counts. -func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []Span) { +// For float histograms, deltaBuckets is always false. +// primaryBuckets hold the main histogram values, while compensationBuckets (if provided) store +// Kahan compensation values. compensationBuckets can only be provided for float histograms +// and are processed in parallel with primaryBuckets to maintain synchronization. +func compactBuckets[IBC InternalBucketCount]( + primaryBuckets []IBC, compensationBuckets []float64, + spans []Span, maxEmptyBuckets int, deltaBuckets bool, +) (updatedPrimaryBuckets []IBC, updatedCompensationBuckets []float64, updatedSpans []Span) { + if deltaBuckets && compensationBuckets != nil { + panic("histogram type mismatch: deltaBuckets cannot be true when compensationBuckets is provided") + } else if compensationBuckets != nil && len(primaryBuckets) != len(compensationBuckets) { + panic(fmt.Errorf( + "primary buckets layout (%v) mismatch against associated compensation buckets layout (%v)", + primaryBuckets, compensationBuckets), + ) + } // Fast path: If there are no empty buckets AND no offset in any span is // <= maxEmptyBuckets AND no span has length 0, there is nothing to do and we can return // immediately. We check that first because it's cheap and presumably // common. nothingToDo := true var currentBucketAbsolute IBC - for _, bucket := range buckets { + for _, bucket := range primaryBuckets { if deltaBuckets { currentBucketAbsolute += bucket } else { @@ -256,7 +271,7 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp } } if nothingToDo { - return buckets, spans + return primaryBuckets, compensationBuckets, spans } } @@ -268,12 +283,19 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp emptyBucketsHere := func() int { i := 0 abs := currentBucketAbsolute - for uint32(i)+posInSpan < spans[iSpan].Length && abs == 0 { + comp := float64(0) + if compensationBuckets != nil { + comp = compensationBuckets[iBucket] + } + for uint32(i)+posInSpan < spans[iSpan].Length && abs == 0 && comp == 0 { i++ - if i+iBucket >= len(buckets) { + if i+iBucket >= len(primaryBuckets) { break } - abs = buckets[i+iBucket] + abs = primaryBuckets[i+iBucket] + if compensationBuckets != nil { + comp = compensationBuckets[i+iBucket] + } } return i } @@ -313,11 +335,11 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp // Cut out empty buckets from start and end of spans, no matter // what. Also cut out empty buckets from the middle of a span but only // if there are more than maxEmptyBuckets consecutive empty buckets. - for iBucket < len(buckets) { + for iBucket < len(primaryBuckets) { if deltaBuckets { - currentBucketAbsolute += buckets[iBucket] + currentBucketAbsolute += primaryBuckets[iBucket] } else { - currentBucketAbsolute = buckets[iBucket] + currentBucketAbsolute = primaryBuckets[iBucket] } if nEmpty := emptyBucketsHere(); nEmpty > 0 { if posInSpan > 0 && @@ -334,11 +356,14 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp continue } // In all other cases, we cut out the empty buckets. - if deltaBuckets && iBucket+nEmpty < len(buckets) { - currentBucketAbsolute = -buckets[iBucket] - buckets[iBucket+nEmpty] += buckets[iBucket] + if deltaBuckets && iBucket+nEmpty < len(primaryBuckets) { + currentBucketAbsolute = -primaryBuckets[iBucket] + primaryBuckets[iBucket+nEmpty] += primaryBuckets[iBucket] + } + primaryBuckets = append(primaryBuckets[:iBucket], primaryBuckets[iBucket+nEmpty:]...) + if compensationBuckets != nil { + compensationBuckets = append(compensationBuckets[:iBucket], compensationBuckets[iBucket+nEmpty:]...) } - buckets = append(buckets[:iBucket], buckets[iBucket+nEmpty:]...) if posInSpan == 0 { // Start of span. if nEmpty == int(spans[iSpan].Length) { @@ -388,8 +413,8 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp iSpan++ } } - if maxEmptyBuckets == 0 || len(buckets) == 0 { - return buckets, spans + if maxEmptyBuckets == 0 || len(primaryBuckets) == 0 { + return primaryBuckets, compensationBuckets, spans } // Finally, check if any offsets between spans are small enough to merge @@ -397,7 +422,7 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp iBucket = int(spans[0].Length) if deltaBuckets { currentBucketAbsolute = 0 - for _, bucket := range buckets[:iBucket] { + for _, bucket := range primaryBuckets[:iBucket] { currentBucketAbsolute += bucket } } @@ -406,7 +431,7 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp if int(spans[iSpan].Offset) > maxEmptyBuckets { l := int(spans[iSpan].Length) if deltaBuckets { - for _, bucket := range buckets[iBucket : iBucket+l] { + for _, bucket := range primaryBuckets[iBucket : iBucket+l] { currentBucketAbsolute += bucket } } @@ -418,22 +443,28 @@ func compactBuckets[IBC InternalBucketCount](buckets []IBC, spans []Span, maxEmp offset := int(spans[iSpan].Offset) spans[iSpan-1].Length += uint32(offset) + spans[iSpan].Length spans = append(spans[:iSpan], spans[iSpan+1:]...) - newBuckets := make([]IBC, len(buckets)+offset) - copy(newBuckets, buckets[:iBucket]) - copy(newBuckets[iBucket+offset:], buckets[iBucket:]) + newPrimaryBuckets := make([]IBC, len(primaryBuckets)+offset) + copy(newPrimaryBuckets, primaryBuckets[:iBucket]) + copy(newPrimaryBuckets[iBucket+offset:], primaryBuckets[iBucket:]) if deltaBuckets { - newBuckets[iBucket] = -currentBucketAbsolute - newBuckets[iBucket+offset] += currentBucketAbsolute + newPrimaryBuckets[iBucket] = -currentBucketAbsolute + newPrimaryBuckets[iBucket+offset] += currentBucketAbsolute + } + primaryBuckets = newPrimaryBuckets + if compensationBuckets != nil { + newCompensationBuckets := make([]float64, len(compensationBuckets)+offset) + copy(newCompensationBuckets, compensationBuckets[:iBucket]) + copy(newCompensationBuckets[iBucket+offset:], compensationBuckets[iBucket:]) + compensationBuckets = newCompensationBuckets } iBucket += offset - buckets = newBuckets - currentBucketAbsolute = buckets[iBucket] + currentBucketAbsolute = primaryBuckets[iBucket] // Note that with many merges, it would be more efficient to // first record all the chunks of empty buckets to insert and // then do it in one go through all the buckets. } - return buckets, spans + return primaryBuckets, compensationBuckets, spans } func checkHistogramSpans(spans []Span, numBuckets int) error { diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 5be60174fc..6ed02aed57 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -349,11 +349,11 @@ func allEmptySpans(s []Span) bool { // Compact works like FloatHistogram.Compact. See there for detailed // explanations. func (h *Histogram) Compact(maxEmptyBuckets int) *Histogram { - h.PositiveBuckets, h.PositiveSpans = compactBuckets( - h.PositiveBuckets, h.PositiveSpans, maxEmptyBuckets, true, + h.PositiveBuckets, _, h.PositiveSpans = compactBuckets( + h.PositiveBuckets, nil, h.PositiveSpans, maxEmptyBuckets, true, ) - h.NegativeBuckets, h.NegativeSpans = compactBuckets( - h.NegativeBuckets, h.NegativeSpans, maxEmptyBuckets, true, + h.NegativeBuckets, _, h.NegativeSpans = compactBuckets( + h.NegativeBuckets, nil, h.NegativeSpans, maxEmptyBuckets, true, ) return h } diff --git a/promql/engine.go b/promql/engine.go index afe82bc38f..cb27af3f46 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -50,6 +50,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/features" + "github.com/prometheus/prometheus/util/kahansum" "github.com/prometheus/prometheus/util/logging" "github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/zeropool" @@ -3239,23 +3240,26 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram } type groupedAggregation struct { - floatValue float64 - histogramValue *histogram.FloatHistogram - floatMean float64 - floatKahanC float64 // "Compensating value" for Kahan summation. - groupCount float64 - heap vectorByValueHeap + floatValue float64 + floatMean float64 + floatKahanC float64 // Compensation float for Kahan summation. + histogramValue *histogram.FloatHistogram + histogramMean *histogram.FloatHistogram + histogramKahanC *histogram.FloatHistogram // Compensation histogram for Kahan summation. + groupCount float64 + heap vectorByValueHeap // All bools together for better packing within the struct. - seen bool // Was this output groups seen in the input at this timestamp. - hasFloat bool // Has at least 1 float64 sample aggregated. - hasHistogram bool // Has at least 1 histogram sample aggregated. - incompatibleHistograms bool // If true, group has seen mixed exponential and custom buckets. - groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. - incrementalMean bool // True after reverting to incremental calculation of the mean value. - counterResetSeen bool // Counter reset hint CounterReset seen. Currently only used for histogram samples. - notCounterResetSeen bool // Counter reset hint NotCounterReset seen. Currently only used for histogram samples. - dropName bool // True if any sample in this group has DropName set. + seen bool // Was this output groups seen in the input at this timestamp. + hasFloat bool // Has at least 1 float64 sample aggregated. + hasHistogram bool // Has at least 1 histogram sample aggregated. + incompatibleHistograms bool // If true, group has seen mixed exponential and custom buckets. + groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. + floatIncrementalMean bool // True after reverting to incremental calculation for float-based mean value. + histogramIncrementalMean bool // True after reverting to incremental calculation for histogram-based mean value. + counterResetSeen bool // Counter reset hint CounterReset seen. Currently only used for histogram samples. + notCounterResetSeen bool // Counter reset hint NotCounterReset seen. Currently only used for histogram samples. + dropName bool // True if any sample in this group has DropName set. } // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. @@ -3345,6 +3349,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.dropName = true } + var ( + nhcbBoundsReconciled bool + err error + ) + switch op { case parser.SUM: if h != nil { @@ -3356,7 +3365,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix case histogram.NotCounterReset: group.notCounterResetSeen = true } - _, _, nhcbBoundsReconciled, err := group.histogramValue.Add(h) + group.histogramKahanC, _, nhcbBoundsReconciled, err = group.histogramValue.KahanAdd(h, group.histogramKahanC) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) group.incompatibleHistograms = true @@ -3370,18 +3379,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true - group.floatValue, group.floatKahanC = kahanSumInc(f, group.floatValue, group.floatKahanC) + group.floatValue, group.floatKahanC = kahansum.Inc(f, group.floatValue, group.floatKahanC) } case parser.AVG: - // For the average calculation of histograms, we use - // incremental mean calculation without the help of - // Kahan summation (but this should change, see - // https://github.com/prometheus/prometheus/issues/14105 - // ). For floats, we improve the accuracy with the help - // of Kahan summation. For a while, we assumed that - // incremental mean calculation combined with Kahan - // summation (see + // We improve the accuracy with the help of Kahan summation. + // For a while, we assumed that incremental mean calculation + // combined with Kahan summation (see // https://stackoverflow.com/questions/61665473/is-it-beneficial-for-precision-to-calculate-the-incremental-mean-average // for inspiration) is generally the preferred solution. // However, it then turned out that direct mean @@ -3416,20 +3420,37 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix case histogram.NotCounterReset: group.notCounterResetSeen = true } - left := h.Copy().Div(group.groupCount) - right := group.histogramValue.Copy().Div(group.groupCount) - - toAdd, _, nhcbBoundsReconciled, err := left.Sub(right) - if err != nil { - handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) - group.incompatibleHistograms = true - continue + if !group.histogramIncrementalMean { + v := group.histogramValue.Copy() + var c *histogram.FloatHistogram + if group.histogramKahanC != nil { + c = group.histogramKahanC.Copy() + } + c, _, nhcbBoundsReconciled, err = v.KahanAdd(h, c) + if err != nil { + handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) + group.incompatibleHistograms = true + continue + } + if nhcbBoundsReconciled { + annos.Add(annotations.NewMismatchedCustomBucketsHistogramsInfo(e.Expr.PositionRange(), annotations.HistogramAgg)) + } + if !v.HasOverflow() { + group.histogramValue, group.histogramKahanC = v, c + break + } + group.histogramIncrementalMean = true + group.histogramMean = group.histogramValue.Copy().Div(group.groupCount - 1) + if group.histogramKahanC != nil { + group.histogramKahanC.Div(group.groupCount - 1) + } } - if nhcbBoundsReconciled { - annos.Add(annotations.NewMismatchedCustomBucketsHistogramsInfo(e.Expr.PositionRange(), annotations.HistogramAgg)) + q := (group.groupCount - 1) / group.groupCount + if group.histogramKahanC != nil { + group.histogramKahanC.Mul(q) } - - _, _, nhcbBoundsReconciled, err = group.histogramValue.Add(toAdd) + toAdd := h.Copy().Div(group.groupCount) + group.histogramKahanC, _, nhcbBoundsReconciled, err = group.histogramMean.Mul(q).KahanAdd(toAdd, group.histogramKahanC) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) group.incompatibleHistograms = true @@ -3444,8 +3465,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true - if !group.incrementalMean { - newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC) + if !group.floatIncrementalMean { + newV, newC := kahansum.Inc(f, group.floatValue, group.floatKahanC) if !math.IsInf(newV, 0) { // The sum doesn't overflow, so we propagate it to the // group struct and continue with the regular @@ -3456,12 +3477,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // If we are here, we know that the sum _would_ overflow. So // instead of continue to sum up, we revert to incremental // calculation of the mean value from here on. - group.incrementalMean = true + group.floatIncrementalMean = true group.floatMean = group.floatValue / (group.groupCount - 1) group.floatKahanC /= group.groupCount - 1 } q := (group.groupCount - 1) / group.groupCount - group.floatMean, group.floatKahanC = kahanSumInc( + group.floatMean, group.floatKahanC = kahansum.Inc( f/group.groupCount, q*group.floatMean, q*group.floatKahanC, @@ -3536,8 +3557,24 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix case aggr.incompatibleHistograms: continue case aggr.hasHistogram: + if aggr.histogramIncrementalMean { + if aggr.histogramKahanC != nil { + aggr.histogramValue, _, _, _ = aggr.histogramMean.Add(aggr.histogramKahanC) + // Add can theoretically return ErrHistogramsIncompatibleSchema, but at + // this stage errors should not occur if earlier KahanAdd calls succeeded. + } else { + aggr.histogramValue = aggr.histogramMean + } + } else { + aggr.histogramValue.Div(aggr.groupCount) + if aggr.histogramKahanC != nil { + aggr.histogramValue, _, _, _ = aggr.histogramValue.Add(aggr.histogramKahanC.Div(aggr.groupCount)) + // Add can theoretically return ErrHistogramsIncompatibleSchema, but at + // this stage errors should not occur if earlier KahanAdd calls succeeded. + } + } aggr.histogramValue = aggr.histogramValue.Compact(0) - case aggr.incrementalMean: + case aggr.floatIncrementalMean: aggr.floatValue = aggr.floatMean + aggr.floatKahanC default: aggr.floatValue = aggr.floatValue/aggr.groupCount + aggr.floatKahanC/aggr.groupCount @@ -3565,6 +3602,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix case aggr.incompatibleHistograms: continue case aggr.hasHistogram: + if aggr.histogramKahanC != nil { + aggr.histogramValue, _, _, _ = aggr.histogramValue.Add(aggr.histogramKahanC) + // Add can theoretically return ErrHistogramsIncompatibleSchema, but at + // this stage errors should not occur if earlier KahanAdd calls succeeded. + } aggr.histogramValue.Compact(0) default: aggr.floatValue += aggr.floatKahanC diff --git a/promql/functions.go b/promql/functions.go index 04a3d55370..f02262ac40 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/schema" "github.com/prometheus/prometheus/util/annotations" + "github.com/prometheus/prometheus/util/kahansum" ) // FunctionCall is the type of a PromQL function implementation @@ -801,10 +802,7 @@ func funcAvgOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh if len(firstSeries.Floats) > 0 && len(firstSeries.Histograms) > 0 { return enh.Out, annotations.New().Add(annotations.NewMixedFloatsHistogramsWarning(getMetricName(firstSeries.Metric), args[0].PositionRange())) } - // For the average calculation of histograms, we use incremental mean - // calculation without the help of Kahan summation (but this should - // change, see https://github.com/prometheus/prometheus/issues/14105 ). - // For floats, we improve the accuracy with the help of Kahan summation. + // We improve the accuracy with the help of Kahan summation. // For a while, we assumed that incremental mean calculation combined // with Kahan summation (see // https://stackoverflow.com/questions/61665473/is-it-beneficial-for-precision-to-calculate-the-incremental-mean-average @@ -847,23 +845,47 @@ func funcAvgOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh } }() - mean := s.Histograms[0].H.Copy() - trackCounterReset(mean) + var ( + sum = s.Histograms[0].H.Copy() + mean, kahanC *histogram.FloatHistogram + count float64 + incrementalMean bool + nhcbBoundsReconciled bool + err error + ) + trackCounterReset(sum) for i, h := range s.Histograms[1:] { trackCounterReset(h.H) - count := float64(i + 2) - left := h.H.Copy().Div(count) - right := mean.Copy().Div(count) - - toAdd, _, nhcbBoundsReconciled, err := left.Sub(right) - if err != nil { - return mean, err + count = float64(i + 2) + if !incrementalMean { + sumCopy := sum.Copy() + var cCopy *histogram.FloatHistogram + if kahanC != nil { + cCopy = kahanC.Copy() + } + cCopy, _, nhcbBoundsReconciled, err = sumCopy.KahanAdd(h.H, cCopy) + if err != nil { + return sumCopy.Div(count), err + } + if nhcbBoundsReconciled { + nhcbBoundsReconciledSeen = true + } + if !sumCopy.HasOverflow() { + sum, kahanC = sumCopy, cCopy + continue + } + incrementalMean = true + mean = sum.Copy().Div(count - 1) + if kahanC != nil { + kahanC.Div(count - 1) + } } - if nhcbBoundsReconciled { - nhcbBoundsReconciledSeen = true + q := (count - 1) / count + if kahanC != nil { + kahanC.Mul(q) } - - _, _, nhcbBoundsReconciled, err = mean.Add(toAdd) + toAdd := h.H.Copy().Div(count) + kahanC, _, nhcbBoundsReconciled, err = mean.Mul(q).KahanAdd(toAdd, kahanC) if err != nil { return mean, err } @@ -871,7 +893,18 @@ func funcAvgOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh nhcbBoundsReconciledSeen = true } } - return mean, nil + if incrementalMean { + if kahanC != nil { + _, _, _, err := mean.Add(kahanC) + return mean, err + } + return mean, nil + } + if kahanC != nil { + _, _, _, err := sum.Div(count).Add(kahanC.Div(count)) + return sum, err + } + return sum.Div(count), nil }) if err != nil { if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { @@ -890,7 +923,7 @@ func funcAvgOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh for i, f := range s.Floats[1:] { count = float64(i + 2) if !incrementalMean { - newSum, newC := kahanSumInc(f.F, sum, kahanC) + newSum, newC := kahansum.Inc(f.F, sum, kahanC) // Perform regular mean calculation as long as // the sum doesn't overflow. if !math.IsInf(newSum, 0) { @@ -904,7 +937,7 @@ func funcAvgOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh kahanC /= (count - 1) } q := (count - 1) / count - mean, kahanC = kahanSumInc(f.F/count, q*mean, q*kahanC) + mean, kahanC = kahansum.Inc(f.F/count, q*mean, q*kahanC) } if incrementalMean { return mean + kahanC @@ -1145,9 +1178,23 @@ func funcSumOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh sum := s.Histograms[0].H.Copy() trackCounterReset(sum) + var ( + comp *histogram.FloatHistogram + nhcbBoundsReconciled bool + err error + ) for _, h := range s.Histograms[1:] { trackCounterReset(h.H) - _, _, nhcbBoundsReconciled, err := sum.Add(h.H) + comp, _, nhcbBoundsReconciled, err = sum.KahanAdd(h.H, comp) + if err != nil { + return sum, err + } + if nhcbBoundsReconciled { + nhcbBoundsReconciledSeen = true + } + } + if comp != nil { + sum, _, nhcbBoundsReconciled, err = sum.Add(comp) if err != nil { return sum, err } @@ -1155,7 +1202,7 @@ func funcSumOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh nhcbBoundsReconciledSeen = true } } - return sum, nil + return sum, err }) if err != nil { if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { @@ -1167,7 +1214,7 @@ func funcSumOverTime(_ []Vector, matrixVal Matrix, args parser.Expressions, enh return aggrOverTime(matrixVal, enh, func(s Series) float64 { var sum, c float64 for _, f := range s.Floats { - sum, c = kahanSumInc(f.F, sum, c) + sum, c = kahansum.Inc(f.F, sum, c) } if math.IsInf(sum, 0) { return sum @@ -1220,8 +1267,8 @@ func varianceOverTime(matrixVal Matrix, args parser.Expressions, enh *EvalNodeHe for _, f := range s.Floats { count++ delta := f.F - (mean + cMean) - mean, cMean = kahanSumInc(delta/count, mean, cMean) - aux, cAux = kahanSumInc(delta*(f.F-(mean+cMean)), aux, cAux) + mean, cMean = kahansum.Inc(delta/count, mean, cMean) + aux, cAux = kahansum.Inc(delta*(f.F-(mean+cMean)), aux, cAux) } variance := (aux + cAux) / count if varianceToResult == nil { @@ -1434,24 +1481,6 @@ func funcTimestamp(vectorVals []Vector, _ Matrix, _ parser.Expressions, enh *Eva return enh.Out, nil } -// We get incorrect results if this function is inlined; see https://github.com/prometheus/prometheus/issues/16714. -// -//go:noinline -func kahanSumInc(inc, sum, c float64) (newSum, newC float64) { - t := sum + inc - switch { - case math.IsInf(t, 0): - c = 0 - - // Using Neumaier improvement, swap if next term larger than sum. - case math.Abs(sum) >= math.Abs(inc): - c += (sum - t) + inc - default: - c += (inc - t) + sum - } - return t, c -} - // linearRegression performs a least-square linear regression analysis on the // provided SamplePairs. It returns the slope, and the intercept value at the // provided time. @@ -1474,10 +1503,10 @@ func linearRegression(samples []FPoint, interceptTime int64) (slope, intercept f } n += 1.0 x := float64(sample.T-interceptTime) / 1e3 - sumX, cX = kahanSumInc(x, sumX, cX) - sumY, cY = kahanSumInc(sample.F, sumY, cY) - sumXY, cXY = kahanSumInc(x*sample.F, sumXY, cXY) - sumX2, cX2 = kahanSumInc(x*x, sumX2, cX2) + sumX, cX = kahansum.Inc(x, sumX, cX) + sumY, cY = kahansum.Inc(sample.F, sumY, cY) + sumXY, cXY = kahansum.Inc(x*sample.F, sumXY, cXY) + sumX2, cX2 = kahansum.Inc(x*x, sumX2, cX2) } if constY { if math.IsInf(initY, 0) { @@ -1613,7 +1642,7 @@ func histogramVariance(vectorVals []Vector, enh *EvalNodeHelper, varianceToResul } } delta := val - mean - variance, cVariance = kahanSumInc(bucket.Count*delta*delta, variance, cVariance) + variance, cVariance = kahansum.Inc(bucket.Count*delta*delta, variance, cVariance) } variance += cVariance variance /= h.Count diff --git a/promql/functions_internal_test.go b/promql/functions_internal_test.go index 9efd9c3c2e..cd170823a8 100644 --- a/promql/functions_internal_test.go +++ b/promql/functions_internal_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser/posrange" + "github.com/prometheus/prometheus/util/kahansum" ) func TestHistogramRateCounterResetHint(t *testing.T) { @@ -79,7 +80,7 @@ func TestKahanSumInc(t *testing.T) { runTest := func(t *testing.T, a, b, expected float64) { t.Run(fmt.Sprintf("%v + %v = %v", a, b, expected), func(t *testing.T) { - sum, c := kahanSumInc(b, a, 0) + sum, c := kahansum.Inc(b, a, 0) result := sum + c if math.IsNaN(expected) { diff --git a/promql/promqltest/testdata/aggregators.test b/promql/promqltest/testdata/aggregators.test index 576b36868f..a3dc61dcff 100644 --- a/promql/promqltest/testdata/aggregators.test +++ b/promql/promqltest/testdata/aggregators.test @@ -687,6 +687,11 @@ load 10s eval instant at 1m sum(data{test="ten"}) {} 10 +# Plain addition doesn't use Kahan summation, so operations involving very large magnitudes +# (±1e+100) lose precision. The smaller values are absorbed, leading to an incorrect result. +# eval instant at 1m sum(data{test="ten",point="a"}) + sum(data{test="ten",point="b"}) + sum(data{test="ten",point="c"}) + sum(data{test="ten",point="d"}) +# {} 10 + eval instant at 1m avg(data{test="ten"}) {} 2.5 diff --git a/promql/promqltest/testdata/native_histograms.test b/promql/promqltest/testdata/native_histograms.test index d66400f787..07352eb59a 100644 --- a/promql/promqltest/testdata/native_histograms.test +++ b/promql/promqltest/testdata/native_histograms.test @@ -1388,22 +1388,28 @@ clear # Test native histograms with sum, count, avg. load 10m - histogram_sum{idx="0"} {{schema:0 count:25 sum:1234.5 z_bucket:4 z_bucket_w:0.001 buckets:[1 2 0 1 1] n_buckets:[2 4 0 0 1 9]}}x1 - histogram_sum{idx="1"} {{schema:0 count:41 sum:2345.6 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}}x1 - histogram_sum{idx="2"} {{schema:0 count:41 sum:1111.1 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}}x1 - histogram_sum{idx="3"} {{schema:1 count:0}}x1 + histogram_sum{idx="0"} {{schema:0 count:25 sum:3.1 z_bucket:4 z_bucket_w:0.001 buckets:[1 2 0 1 1] n_buckets:[2 4 0 0 1 9]}}x1 + histogram_sum{idx="1"} {{schema:0 count:41 sum:1e100 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}}x1 + histogram_sum{idx="2"} {{schema:0 count:41 sum:-1e100 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}}x1 + histogram_sum{idx="3"} {{schema:1 count:0 sum:1.3 z_bucket:3 z_bucket_w:0.001 buckets:[2 4 2 3 2 2] n_buckets:[1 2 5 3 8 1 1 1 1 6 3]}}x1 histogram_sum_float{idx="0"} 42.0x1 eval instant at 10m sum(histogram_sum) expect no_warn - {} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}} + {} {{schema:0 count:107 sum:4.4 z_bucket:17 z_bucket_w:0.001 buckets:[5 14 7 7 3 2 2] n_buckets:[3 13 19 6 17 18 0 0 0 10 10 4]}} eval instant at 10m sum({idx="0"}) expect warn -eval instant at 10m sum(histogram_sum{idx="0"} + ignoring(idx) histogram_sum{idx="1"} + ignoring(idx) histogram_sum{idx="2"} + ignoring(idx) histogram_sum{idx="3"}) +eval instant at 10m sum(histogram_sum{idx="0"} + ignoring(idx) histogram_sum{idx="3"}) expect no_warn - {} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}} + {} {{schema:0 count:25 sum:4.4 z_bucket:7 z_bucket_w:0.001 buckets:[3 8 5 3 1] n_buckets:[3 11 11 2 3 18]}} + +# Plain addition doesn't use Kahan summation, so operations involving very large magnitudes +# (±1e+100) lose precision. The smaller values are absorbed, leading to an incorrect result. +# eval instant at 10m sum(histogram_sum{idx="0"} + ignoring(idx) histogram_sum{idx="1"} + ignoring(idx) histogram_sum{idx="2"} + ignoring(idx) histogram_sum{idx="3"}) +# expect no_warn +# {} {{schema:0 count:107 sum:4.4 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}} eval instant at 10m count(histogram_sum) expect no_warn @@ -1411,13 +1417,63 @@ eval instant at 10m count(histogram_sum) eval instant at 10m avg(histogram_sum) expect no_warn - {} {{schema:0 count:26.75 sum:1172.8 z_bucket:3.5 z_bucket_w:0.001 buckets:[0.75 2 0.5 1.25 0.75 0.5 0.5] n_buckets:[0.5 1.5 2 1 3.75 2.25 0 0 0 2.5 2.5 1]}} + {} {{schema:0 count:26.75 sum:1.1 z_bucket:4.25 z_bucket_w:0.001 buckets:[1.25 3.5 1.75 1.75 0.75 0.5 0.5] n_buckets:[0.75 3.25 4.75 1.5 4.25 4.5 0 0 0 2.5 2.5 1]}} + +clear + +# Test native histograms with incremental avg calulation. +# Very large floats involved trigger incremental avg calculation, as direct avg calculation would overflow float64. +load 10m + histogram_avg_incremental{idx="0"} {{schema:0 count:1.7976931348623157e+308 sum:5.30921651659898 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[1.78264E+50 1.78264E+215 1.78264E+219 3363.5121756487] n_buckets:[1178.20696291113 731.697776280323 715.201503759399 1386.11378876781 855.572553278132]}}x1 + histogram_avg_incremental{idx="1"} {{schema:0 count:1e308 sum:0.961118537914768 z_bucket:0.76342771 z_bucket_w:0.001 buckets:[0.76342771 0.76342771 0.76342771 195.70084087969] n_buckets:[421.30382970055 0 450441.779]}}x1 + histogram_avg_incremental{idx="2"} {{schema:0 count:1e-6 sum:1.62091361305318 z_bucket:1.9592258 z_bucket_w:0.001 buckets:[1.9592258 1.9592258 1.9592258 1135.74692279] n_buckets:[0 4504.41779 588.599358265103 40.3760942760943]}}x1 + histogram_avg_incremental{idx="3"} {{schema:0 count:1e-6 sum:0.865089463758091 z_bucket:7.69805412 z_bucket_w:0.001 buckets:[2.258E+220 2.258E+220 2.3757689E+217 1078.68071312804] n_buckets:[349.905284031261 0 0 0.161173466838949 588.599358]}}x1 + histogram_avg_incremental{idx="4"} {{schema:0 count:1e-6 sum:0.323055185914577 z_bucket:458.90154 z_bucket_w:0.001 buckets:[7.69805412 7.69805412 2.258E+220 3173.28218135701]}}x1 + histogram_avg_incremental{idx="5"} {{schema:0 count:1e-6 sum:0.951811357687154 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[458.90154 458.90154 7.69805412 2178.35] n_buckets:[2054.92644438789 844.560108898123]}}x1 + histogram_avg_incremental{idx="6"} {{schema:0 count:1e-6 sum:0 z_bucket:5 z_bucket_w:0.001 buckets:[0 0 1.78264E+219 376.770478890989]}}x1 + histogram_avg_incremental{idx="7"} {{schema:0 count:1e-6 sum:0 z_bucket:0 z_bucket_w:0.001 buckets:[0 0 458.90154 250325.5] n_buckets:[0 0.0000000011353 0 608.697257]}}x1 +# This test fails due to float64 rounding in the incremental average calculation. +# For large intermediate means (e.g. ~1e99), multiplying by a fractional weight like (n-1)/n +# produces values such as 2.0000000000000002e99 instead of the mathematically exact 2e99. +# While the relative error is tiny, subtracting nearly equal high-magnitude values later +# result in a large absolute error. The outcome also depends on the (effectively random) order +# in which input series are processed which makes the test flaky. +# histogram_avg_incremental_2{idx="0"} {{schema:0 count:1.7976931348623157e+308 sum:5.3 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[1.78264E+50 1.78264E+215 1.78264E+219 3363.5121756487] n_buckets:[1178.20696291113 731.697776280323 715.201503759399 1386.11378876781 855.572553278132]}}x1 +# histogram_avg_incremental_2{idx="1"} {{schema:0 count:1e308 sum:1e100 z_bucket:0.76342771 z_bucket_w:0.001 buckets:[0.76342771 0.76342771 0.76342771 195.70084087969] n_buckets:[421.30382970055 0 450441.779]}}x1 +# histogram_avg_incremental_2{idx="2"} {{schema:0 count:1e-6 sum:1 z_bucket:1.9592258 z_bucket_w:0.001 buckets:[1.9592258 1.9592258 1.9592258 1135.74692279] n_buckets:[0 4504.41779 588.599358265103 40.3760942760943]}}x1 +# histogram_avg_incremental_2{idx="3"} {{schema:0 count:1e-6 sum:-1e100 z_bucket:7.69805412 z_bucket_w:0.001 buckets:[2.258E+220 2.258E+220 2.3757689E+217 1078.68071312804] n_buckets:[349.905284031261 0 0 0.161173466838949 588.599358]}}x1 +# histogram_avg_incremental_2{idx="4"} {{schema:0 count:1e-6 sum:1 z_bucket:458.90154 z_bucket_w:0.001 buckets:[7.69805412 7.69805412 2.258E+220 3173.28218135701]}}x1 +# histogram_avg_incremental_2{idx="5"} {{schema:0 count:1e-6 sum:1 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[458.90154 458.90154 7.69805412 2178.35] n_buckets:[2054.92644438789 844.560108898123]}}x1 +# histogram_avg_incremental_2{idx="6"} {{schema:0 count:1e-6 sum:0 z_bucket:5 z_bucket_w:0.001 buckets:[0 0 1.78264E+219 376.770478890989]}}x1 +# histogram_avg_incremental_2{idx="7"} {{schema:0 count:1e-6 sum:0 z_bucket:0 z_bucket_w:0.001 buckets:[0 0 458.90154 250325.5] n_buckets:[0 0.0000000011353 0 608.697257]}}x1 + +eval instant at 10m avg(histogram_avg_incremental) + {} {{schema:0 count:3.497116418577895e+307 sum:1.2539005843658437 z_bucket:4.4566e49 z_bucket_w:0.001 buckets:[2.8225e+219 2.822522283e+219 3.271129711125e+219 32728.442914086805] n_buckets:[500.5428151288539 760.0844593974477 56468.19748275306 254.4185391888429 180.5214889097665]}} + +# This test doesn't work, see the load section above for reasoning. +# eval instant at 10m avg(histogram_avg_incremental_2) +# {} {{schema:0 count:3.497116418577895e+307 sum:1.0375 z_bucket:4.4566e49 z_bucket_w:0.001 buckets:[2.8225e+219 2.822522283e+219 3.271129711125e+219 32728.442914086805] n_buckets:[500.5428151288539 760.0844593974477 56468.19748275306 254.4185391888429 180.5214889097665]}} clear # Test native histograms with sum_over_time, avg_over_time. load 1m histogram_sum_over_time {{schema:0 count:25 sum:1234.5 z_bucket:4 z_bucket_w:0.001 buckets:[1 2 0 1 1] n_buckets:[2 4 0 0 1 9]}} {{schema:0 count:41 sum:2345.6 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}} {{schema:0 count:41 sum:1111.1 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}} {{schema:1 count:0}} + histogram_sum_over_time_2 {{schema:0 count:1e10 sum:5.30921651659898 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[1.78264E+50 1.78264E+215 1.78264E+219 3363.5121756487] n_buckets:[1178.20696291113 731.697776280323 715.201503759399 1386.11378876781 855.572553278132]}} {{schema:0 count:1e-6 sum:0.961118537914768 z_bucket:0.76342771 z_bucket_w:0.001 buckets:[0.76342771 0.76342771 0.76342771 195.70084087969] n_buckets:[421.30382970055 0 450441.779]}} {{schema:0 count:1e-6 sum:1.62091361305318 z_bucket:1.9592258 z_bucket_w:0.001 buckets:[1.9592258 1.9592258 1.9592258 1135.74692279] n_buckets:[0 4504.41779 588.599358265103 40.3760942760943]}} {{schema:0 count:1e-6 sum:0.865089463758091 z_bucket:7.69805412 z_bucket_w:0.001 buckets:[2.258E+220 2.258E+220 2.3757689E+217 1078.68071312804] n_buckets:[349.905284031261 0 0 0.161173466838949 588.599358]}} {{schema:0 count:1e-6 sum:0.323055185914577 z_bucket:458.90154 z_bucket_w:0.001 buckets:[7.69805412 7.69805412 2.258E+220 3173.28218135701]}} {{schema:0 count:1e-6 sum:0.951811357687154 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[458.90154 458.90154 7.69805412 2178.35] n_buckets:[2054.92644438789 844.560108898123]}} {{schema:0 count:1e-6 sum:0 z_bucket:5 z_bucket_w:0.001 buckets:[0 0 1.78264E+219 376.770478890989]}} {{schema:0 count:1e-6 sum:0 z_bucket:0 z_bucket_w:0.001 buckets:[0 0 458.90154 250325.5] n_buckets:[0 0.0000000011353 0 608.697257]}} + histogram_sum_over_time_3 {{schema:0 count:1 sum:1}} {{schema:0 count:2 sum:1e100}} {{schema:0 count:3 sum:1}} {{schema:0 count:4 sum:-1e100}} + histogram_sum_over_time_4 {{schema:0 count:1 sum:5.3}} {{schema:0 count:2 sum:1e100}} {{schema:0 count:3 sum:1}} {{schema:0 count:4 sum:-1e100}} {{schema:0 count:5 sum:2}} {{schema:0 count:6 sum:1e50}} {{schema:0 count:7 sum:-1e50}} + histogram_sum_over_time_incremental {{schema:0 count:1.7976931348623157e+308 sum:5.30921651659898 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[1.78264E+50 1.78264E+215 1.78264E+219 3363.5121756487] n_buckets:[1178.20696291113 731.697776280323 715.201503759399 1386.11378876781 855.572553278132]}} {{schema:0 count:1e308 sum:0.961118537914768 z_bucket:0.76342771 z_bucket_w:0.001 buckets:[0.76342771 0.76342771 0.76342771 195.70084087969] n_buckets:[421.30382970055 0 450441.779]}} {{schema:0 count:1e-6 sum:1.62091361305318 z_bucket:1.9592258 z_bucket_w:0.001 buckets:[1.9592258 1.9592258 1.9592258 1135.74692279] n_buckets:[0 4504.41779 588.599358265103 40.3760942760943]}} {{schema:0 count:1e-6 sum:0.865089463758091 z_bucket:7.69805412 z_bucket_w:0.001 buckets:[2.258E+220 2.258E+220 2.3757689E+217 1078.68071312804] n_buckets:[349.905284031261 0 0 0.161173466838949 588.599358]}} {{schema:0 count:1e-6 sum:0.323055185914577 z_bucket:458.90154 z_bucket_w:0.001 buckets:[7.69805412 7.69805412 2.258E+220 3173.28218135701]}} {{schema:0 count:1e-6 sum:0.951811357687154 z_bucket:1.78264e50 z_bucket_w:0.001 buckets:[458.90154 458.90154 7.69805412 2178.35] n_buckets:[2054.92644438789 844.560108898123]}} {{schema:0 count:1e-6 sum:0 z_bucket:5 z_bucket_w:0.001 buckets:[0 0 1.78264E+219 376.770478890989]}} {{schema:0 count:1e-6 sum:0 z_bucket:0 z_bucket_w:0.001 buckets:[0 0 458.90154 250325.5] n_buckets:[0 0.0000000011353 0 608.697257]}} + histogram_sum_over_time_incremental_2 {{schema:0 count:1.7976931348623157e+308 sum:5.3}} {{schema:0 count:1e308 sum:1e100}} {{schema:0 count:1e-6 sum:1}} {{schema:0 count:1e-6 sum:-1e100}} {{schema:0 count:1e-6 sum:2}} {{schema:0 count:1e-6 sum:0}} {{schema:0 count:1e-6 sum:0}} + histogram_sum_over_time_incremental_3 {{schema:0 count:1.7976931348623157e+308 sum:5.3}} {{schema:0 count:1e308 sum:1e100}} {{schema:0 count:1e-6 sum:-1e100}} {{schema:0 count:1e-6 sum:1}} {{schema:0 count:1e-6 sum:1e100}} {{schema:0 count:1e-6 sum:-1e100}} {{schema:0 count:1e-6 sum:0}} + histogram_sum_over_time_incremental_4 {{schema:0 count:1.7976931348623157e+308 sum:5.3}} {{schema:0 count:1e308 sum:1e100}} {{schema:0 count:1e-6 sum:-1e100}} {{schema:0 count:1e-6 sum:1}} {{schema:0 count:1e-6 sum:1e50}} {{schema:0 count:1e-6 sum:-1e50}} {{schema:0 count:1e-6 sum:0}} + histogram_sum_over_time_incremental_6 {{schema:0 count:1.7976931348623157e+308 sum:1}} {{schema:0 count:1e308 sum:1e100}} {{schema:0 count:1e-6 sum:1}} {{schema:0 count:1e-6 sum:-1e100}} +# Kahan summation only compensates reliably across two magnitude scales. In following inputs, the +# series contains three distinct magnitude groups (≈1, ≈1e50, and ≈1e100). When these magnitudes +# are interleaved, rounding error can't be fully compensated, causing smaller values to be lost. +# However, when values are ordered so that cancellation within one magnitude group +# occurs first, followed by cancellation of the next group, the outcome remains accurate. +# histogram_sum_over_time_5 {{schema:0 count:1 sum:5.3}} {{schema:0 count:2 sum:1e100}} {{schema:0 count:3 sum:1}} {{schema:0 count:4 sum:1e50}} {{schema:0 count:5 sum:2}} {{schema:0 count:6 sum:-1e100}} {{schema:0 count:7 sum:-1e50}} +# histogram_sum_over_time_incremental_5 {{schema:0 count:1.7976931348623157e+308 sum:5.3}} {{schema:0 count:1e308 sum:1e100}} {{schema:0 count:1e-6 sum:1e50}} {{schema:0 count:1e-6 sum:1}} {{schema:0 count:1e-6 sum:-1e100}} {{schema:0 count:1e-6 sum:-1e50}} {{schema:0 count:1e-6 sum:0}} eval instant at 3m sum_over_time(histogram_sum_over_time[4m:1m]) {} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}} @@ -1425,6 +1481,68 @@ eval instant at 3m sum_over_time(histogram_sum_over_time[4m:1m]) eval instant at 3m avg_over_time(histogram_sum_over_time[4m:1m]) {} {{schema:0 count:26.75 sum:1172.8 z_bucket:3.5 z_bucket_w:0.001 buckets:[0.75 2 0.5 1.25 0.75 0.5 0.5] n_buckets:[0.5 1.5 2 1 3.75 2.25 0 0 0 2.5 2.5 1]}} +eval instant at 7m sum_over_time(histogram_sum_over_time_2[8m:1m]) + {} {{schema:0 count:10000000000.000008 sum:10.03120467492675 z_bucket:3.56528e+50 z_bucket_w:0.001 buckets:[2.258e+220 2.2580178264e+220 2.6169037689e+220 261827.54331269444] n_buckets:[4004.342521030831 6080.675675179582 451745.57986202446 2035.3483135107433 1444.171911278132]}} + +eval instant at 7m avg_over_time(histogram_sum_over_time_2[8m:1m]) + {} {{schema:0 count:1250000000.000001 sum:1.2539005843658437 z_bucket:4.4566e49 z_bucket_w:0.001 buckets:[2.8225e+219 2.822522283e+219 3.271129711125e+219 32728.442914086805] n_buckets:[500.5428151288539 760.0844593974477 56468.19748275306 254.4185391888429 180.5214889097665]}} + +eval instant at 3m sum_over_time(histogram_sum_over_time_3[4m:1m]) + {} {{schema:0 count:10 sum:2}} + +eval instant at 3m avg_over_time(histogram_sum_over_time_3[4m:1m]) + {} {{schema:0 count:2.5 sum:0.5}} + +eval instant at 6m sum_over_time(histogram_sum_over_time_4[7m:1m]) + {} {{schema:0 count:28 sum:8.3}} + +eval instant at 6m avg_over_time(histogram_sum_over_time_4[7m:1m]) + {} {{schema:0 count:4 sum:1.1857142857142857}} + +# These tests don't work, see the load section above for reasoning. +# eval instant at 6m sum_over_time(histogram_sum_over_time_5[7m:1m]) +# {} {{schema:0 count:28 sum:8.3}} +# +# eval instant at 6m avg_over_time(histogram_sum_over_time_5[7m:1m]) +# {} {{schema:0 count:4 sum:1.1857142857142857}} + +eval instant at 7m sum_over_time(histogram_sum_over_time_incremental[8m:1m]) + {} {{schema:0 count:Inf sum:10.03120467492675 z_bucket:3.56528e+50 z_bucket_w:0.001 buckets:[2.258e+220 2.2580178264e+220 2.6169037689e+220 261827.54331269444] n_buckets:[4004.342521030831 6080.675675179582 451745.57986202446 2035.3483135107433 1444.171911278132]}} + +eval instant at 7m avg_over_time(histogram_sum_over_time_incremental[8m:1m]) + {} {{schema:0 count:3.497116418577895e+307 sum:1.2539005843658437 z_bucket:4.4566e49 z_bucket_w:0.001 buckets:[2.8225e+219 2.822522283e+219 3.271129711125e+219 32728.442914086805] n_buckets:[500.5428151288539 760.0844593974477 56468.19748275306 254.4185391888429 180.5214889097665]}} + +eval instant at 6m sum_over_time(histogram_sum_over_time_incremental_2[7m:1m]) + {} {{schema:0 count:Inf sum:8.3}} + +eval instant at 6m avg_over_time(histogram_sum_over_time_incremental_2[7m:1m]) + {} {{schema:0 count:3.9967044783747367e+307 sum:1.1857142857142857}} + +eval instant at 6m sum_over_time(histogram_sum_over_time_incremental_3[7m:1m]) + {} {{schema:0 count:Inf sum:6.3}} + +eval instant at 6m avg_over_time(histogram_sum_over_time_incremental_3[7m:1m]) + {} {{schema:0 count:3.9967044783747367e+307 sum:0.9}} + +eval instant at 6m sum_over_time(histogram_sum_over_time_incremental_4[7m:1m]) + {} {{schema:0 count:Inf sum:6.3}} + +eval instant at 6m avg_over_time(histogram_sum_over_time_incremental_4[7m:1m]) + {} {{schema:0 count:3.9967044783747367e+307 sum:0.9}} + +# These tests don't work, see the load section above for reasoning. +# eval instant at 6m sum_over_time(histogram_sum_over_time_incremental_5[7m:1m]) +# {} {{schema:0 count:Inf sum:6.3}} +# +# eval instant at 6m avg_over_time(histogram_sum_over_time_incremental_5[7m:1m]) +# {} {{schema:0 count:3.9967044783747367e+307 sum:0.9}} + +eval instant at 3m sum_over_time(histogram_sum_over_time_incremental_6[4m:1m]) + {} {{schema:0 count:Inf sum:2}} + +eval instant at 3m avg_over_time(histogram_sum_over_time_incremental_6[4m:1m]) + {} {{schema:0 count:6.99423283715579e+307 sum:0.5}} + clear # Test native histograms with sub operator. diff --git a/util/kahansum/kahansum.go b/util/kahansum/kahansum.go new file mode 100644 index 0000000000..d55defcb29 --- /dev/null +++ b/util/kahansum/kahansum.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kahansum + +import "math" + +// Inc performs addition of two floating-point numbers using the Kahan summation algorithm. +// We get incorrect results if this function is inlined; see https://github.com/prometheus/prometheus/issues/16714. +// +//go:noinline +func Inc(inc, sum, c float64) (newSum, newC float64) { + t := sum + inc + switch { + case math.IsInf(t, 0): + c = 0 + + // Using Neumaier improvement, swap if next term larger than sum. + case math.Abs(sum) >= math.Abs(inc): + c += (sum - t) + inc + default: + c += (inc - t) + sum + } + return t, c +} + +func Dec(dec, sum, c float64) (newSum, newC float64) { + return Inc(-dec, sum, c) +}