@ -18,6 +18,8 @@ import (
"fmt"
"math"
"strings"
"github.com/prometheus/prometheus/util/kahansum"
)
// FloatHistogram is similar to Histogram but uses float64 for all
@ -353,7 +355,7 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte
}
counterResetCollision = h . adjustCounterReset ( other )
if ! h . UsesCustomBuckets ( ) {
otherZeroCount := h . reconcileZeroBuckets ( other )
otherZeroCount , _ := h . reconcileZeroBuckets ( other , nil )
h . ZeroCount += otherZeroCount
}
h . Count += other . Count
@ -374,11 +376,11 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte
intersectedBounds := intersectCustomBucketBounds ( h . CustomValues , other . CustomValues )
// Add with mapping - maps both histograms to intersected layout.
h . PositiveSpans , h . PositiveBuckets = addCustomBucketsWithMismatches (
h . PositiveSpans , h . PositiveBuckets , _ = addCustomBucketsWithMismatches (
false ,
hPositiveSpans , hPositiveBuckets , h . CustomValues ,
otherPositiveSpans , otherPositiveBuckets , other . CustomValues ,
intersectedBounds )
nil , intersectedBounds )
h . CustomValues = intersectedBounds
}
return h , counterResetCollision , nhcbBoundsReconciled , nil
@ -408,6 +410,121 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte
return h , counterResetCollision , nhcbBoundsReconciled , nil
}
// KahanAdd works like Add but using the Kahan summation algorithm to minimize numerical errors.
// c is a histogram holding the Kahan compensation term. It is modified in-place if non-nil.
// If c is nil, a new compensation histogram is created inside the function. In this case,
// the caller must use the returned updatedC, because the original c variable is not modified.
func ( h * FloatHistogram ) KahanAdd ( other , c * FloatHistogram ) ( updatedC * FloatHistogram , counterResetCollision , nhcbBoundsReconciled bool , err error ) {
if err := h . checkSchemaAndBounds ( other ) ; err != nil {
return nil , false , false , err
}
counterResetCollision = h . adjustCounterReset ( other )
if c == nil {
c = h . newCompensationHistogram ( )
}
if ! h . UsesCustomBuckets ( ) {
otherZeroCount , otherCZeroCount := h . reconcileZeroBuckets ( other , c )
h . ZeroCount , c . ZeroCount = kahansum . Inc ( otherZeroCount , h . ZeroCount , c . ZeroCount )
h . ZeroCount , c . ZeroCount = kahansum . Inc ( otherCZeroCount , h . ZeroCount , c . ZeroCount )
}
h . Count , c . Count = kahansum . Inc ( other . Count , h . Count , c . Count )
h . Sum , c . Sum = kahansum . Inc ( other . Sum , h . Sum , c . Sum )
var (
hPositiveSpans = h . PositiveSpans
hPositiveBuckets = h . PositiveBuckets
otherPositiveSpans = other . PositiveSpans
otherPositiveBuckets = other . PositiveBuckets
cPositiveBuckets = c . PositiveBuckets
)
if h . UsesCustomBuckets ( ) {
if CustomBucketBoundsMatch ( h . CustomValues , other . CustomValues ) {
h . PositiveSpans , h . PositiveBuckets , c . PositiveBuckets = kahanAddBuckets (
h . Schema , h . ZeroThreshold , false ,
hPositiveSpans , hPositiveBuckets ,
otherPositiveSpans , otherPositiveBuckets ,
cPositiveBuckets , nil ,
)
} else {
nhcbBoundsReconciled = true
intersectedBounds := intersectCustomBucketBounds ( h . CustomValues , other . CustomValues )
// Add with mapping - maps both histograms to intersected layout.
h . PositiveSpans , h . PositiveBuckets , c . PositiveBuckets = addCustomBucketsWithMismatches (
false ,
hPositiveSpans , hPositiveBuckets , h . CustomValues ,
otherPositiveSpans , otherPositiveBuckets , other . CustomValues ,
cPositiveBuckets , intersectedBounds )
h . CustomValues = intersectedBounds
c . CustomValues = intersectedBounds
}
c . PositiveSpans = h . PositiveSpans
return c , counterResetCollision , nhcbBoundsReconciled , nil
}
otherC := other . newCompensationHistogram ( )
var (
hNegativeSpans = h . NegativeSpans
hNegativeBuckets = h . NegativeBuckets
otherNegativeSpans = other . NegativeSpans
otherNegativeBuckets = other . NegativeBuckets
cNegativeBuckets = c . NegativeBuckets
otherCPositiveBuckets = otherC . PositiveBuckets
otherCNegativeBuckets = otherC . NegativeBuckets
)
switch {
case other . Schema < h . Schema :
hPositiveSpans , hPositiveBuckets , cPositiveBuckets = kahanReduceResolution (
hPositiveSpans , hPositiveBuckets , cPositiveBuckets ,
h . Schema , other . Schema ,
true ,
)
hNegativeSpans , hNegativeBuckets , cNegativeBuckets = kahanReduceResolution (
hNegativeSpans , hNegativeBuckets , cNegativeBuckets ,
h . Schema , other . Schema ,
true ,
)
h . Schema = other . Schema
case other . Schema > h . Schema :
otherPositiveSpans , otherPositiveBuckets , otherCPositiveBuckets = kahanReduceResolution (
otherPositiveSpans , otherPositiveBuckets , otherCPositiveBuckets ,
other . Schema , h . Schema ,
false ,
)
otherNegativeSpans , otherNegativeBuckets , otherCNegativeBuckets = kahanReduceResolution (
otherNegativeSpans , otherNegativeBuckets , otherCNegativeBuckets ,
other . Schema , h . Schema ,
false ,
)
}
h . PositiveSpans , h . PositiveBuckets , c . PositiveBuckets = kahanAddBuckets (
h . Schema , h . ZeroThreshold , false ,
hPositiveSpans , hPositiveBuckets ,
otherPositiveSpans , otherPositiveBuckets ,
cPositiveBuckets , otherCPositiveBuckets ,
)
h . NegativeSpans , h . NegativeBuckets , c . NegativeBuckets = kahanAddBuckets (
h . Schema , h . ZeroThreshold , false ,
hNegativeSpans , hNegativeBuckets ,
otherNegativeSpans , otherNegativeBuckets ,
cNegativeBuckets , otherCNegativeBuckets ,
)
c . Schema = h . Schema
c . ZeroThreshold = h . ZeroThreshold
c . PositiveSpans = h . PositiveSpans
c . NegativeSpans = h . NegativeSpans
return c , counterResetCollision , nhcbBoundsReconciled , nil
}
// Sub works like Add but subtracts the other histogram. It uses the same logic
// to adjust the counter reset hint. This is useful where this method is used
// for incremental mean calculation. However, if it is used for the actual "-"
@ -419,7 +536,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte
}
counterResetCollision = h . adjustCounterReset ( other )
if ! h . UsesCustomBuckets ( ) {
otherZeroCount := h . reconcileZeroBuckets ( other )
otherZeroCount , _ := h . reconcileZeroBuckets ( other , nil )
h . ZeroCount -= otherZeroCount
}
h . Count -= other . Count
@ -440,11 +557,11 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte
intersectedBounds := intersectCustomBucketBounds ( h . CustomValues , other . CustomValues )
// Subtract with mapping - maps both histograms to intersected layout.
h . PositiveSpans , h . PositiveBuckets = addCustomBucketsWithMismatches (
h . PositiveSpans , h . PositiveBuckets , _ = addCustomBucketsWithMismatches (
true ,
hPositiveSpans , hPositiveBuckets , h . CustomValues ,
otherPositiveSpans , otherPositiveBuckets , other . CustomValues ,
intersectedBounds )
nil , intersectedBounds )
h . CustomValues = intersectedBounds
}
return h , counterResetCollision , nhcbBoundsReconciled , nil
@ -576,15 +693,28 @@ func (h *FloatHistogram) Size() int {
// easier to iterate through. Still, the safest bet is to use maxEmptyBuckets==0
// and only use a larger number if you know what you are doing.
func ( h * FloatHistogram ) Compact ( maxEmptyBuckets int ) * FloatHistogram {
h . PositiveBuckets , h . PositiveSpans = compactBuckets (
h . PositiveBuckets , h . PositiveSpans , maxEmptyBuckets , false ,
h . PositiveBuckets , _ , h . PositiveSpans = compactBuckets (
h . PositiveBuckets , nil , h . PositiveSpans , maxEmptyBuckets , false ,
)
h . NegativeBuckets , h . NegativeSpans = compactBuckets (
h . NegativeBuckets , h . NegativeSpans , maxEmptyBuckets , false ,
h . NegativeBuckets , _ , h . NegativeSpans = compactBuckets (
h . NegativeBuckets , nil , h . NegativeSpans , maxEmptyBuckets , false ,
)
return h
}
// kahanCompact works like Compact, but it is specialized for FloatHistogram's KahanAdd method.
// c is a histogram holding the Kahan compensation term.
func ( h * FloatHistogram ) kahanCompact ( maxEmptyBuckets int , c * FloatHistogram ,
) ( updatedH , updatedC * FloatHistogram ) {
h . PositiveBuckets , c . PositiveBuckets , h . PositiveSpans = compactBuckets (
h . PositiveBuckets , c . PositiveBuckets , h . PositiveSpans , maxEmptyBuckets , false ,
)
h . NegativeBuckets , c . NegativeBuckets , h . NegativeSpans = compactBuckets (
h . NegativeBuckets , c . NegativeBuckets , h . NegativeSpans , maxEmptyBuckets , false ,
)
return h , c
}
// DetectReset returns true if the receiving histogram is missing any buckets
// that have a non-zero population in the provided previous histogram. It also
// returns true if any count (in any bucket, in the zero count, or in the count
@ -652,7 +782,7 @@ func (h *FloatHistogram) DetectReset(previous *FloatHistogram) bool {
// ZeroThreshold decreased.
return true
}
previousZeroCount , newThreshold := previous . zeroCountForLargerThreshold ( h . ZeroThreshold )
previousZeroCount , newThreshold , _ := previous . zeroCountForLargerThreshold ( h . ZeroThreshold , nil )
if newThreshold != h . ZeroThreshold {
// ZeroThreshold is within a populated bucket in previous
// histogram.
@ -847,30 +977,42 @@ func (h *FloatHistogram) Validate() error {
}
// zeroCountForLargerThreshold returns what the histogram's zero count would be
// if the ZeroThreshold had the provided larger (or equal) value. If the
// provided value is less than the histogram's ZeroThreshold, the method panics.
// if the ZeroThreshold had the provided larger (or equal) value. It also returns the
// zero count of the compensation histogram `c` if provided (used for Kahan summation).
//
// If the provided ZeroThreshold is less than the histogram's ZeroThreshold, the method panics.
// If the largerThreshold ends up within a populated bucket of the histogram, it
// is adjusted upwards to the lower limit of that bucket (all in terms of
// absolute values) and that bucket's count is included in the returned
// count. The adjusted threshold is returned, too.
func ( h * FloatHistogram ) zeroCountForLargerThreshold ( largerThreshold float64 ) ( count , threshold float64 ) {
func ( h * FloatHistogram ) zeroCountForLargerThreshold (
largerThreshold float64 , c * FloatHistogram ) ( hZeroCount , threshold , cZeroCount float64 ,
) {
if c != nil {
cZeroCount = c . ZeroCount
}
// Fast path.
if largerThreshold == h . ZeroThreshold {
return h . ZeroCount , largerThreshold
return h . ZeroCount , largerThreshold , cZeroCount
}
if largerThreshold < h . ZeroThreshold {
panic ( fmt . Errorf ( "new threshold %f is less than old threshold %f" , largerThreshold , h . ZeroThreshold ) )
}
outer :
for {
c ount = h . ZeroCount
hZeroC ount = h . ZeroCount
i := h . PositiveBucketIterator ( )
bucketsIdx := 0
for i . Next ( ) {
b := i . At ( )
if b . Lower >= largerThreshold {
break
}
count += b . Count // Bucket to be merged into zero bucket.
// Bucket to be merged into zero bucket.
hZeroCount , cZeroCount = kahansum . Inc ( b . Count , hZeroCount , cZeroCount )
if c != nil {
hZeroCount , cZeroCount = kahansum . Inc ( c . PositiveBuckets [ bucketsIdx ] , hZeroCount , cZeroCount )
}
if b . Upper > largerThreshold {
// New threshold ended up within a bucket. if it's
// populated, we need to adjust largerThreshold before
@ -880,14 +1022,20 @@ outer:
}
break
}
bucketsIdx ++
}
i = h . NegativeBucketIterator ( )
bucketsIdx = 0
for i . Next ( ) {
b := i . At ( )
if b . Upper <= - largerThreshold {
break
}
count += b . Count // Bucket to be merged into zero bucket.
// Bucket to be merged into zero bucket.
hZeroCount , cZeroCount = kahansum . Inc ( b . Count , hZeroCount , cZeroCount )
if c != nil {
hZeroCount , cZeroCount = kahansum . Inc ( c . NegativeBuckets [ bucketsIdx ] , hZeroCount , cZeroCount )
}
if b . Lower < - largerThreshold {
// New threshold ended up within a bucket. If
// it's populated, we need to adjust
@ -900,15 +1048,17 @@ outer:
}
break
}
bucketsIdx ++
}
return c ount, largerThreshold
return hZeroC ount, largerThreshold , cZeroCount
}
}
// trimBucketsInZeroBucket removes all buckets that are within the zero
// bucket. It assumes that the zero threshold is at a bucket boundary and that
// the counts in the buckets to remove are already part of the zero count.
func ( h * FloatHistogram ) trimBucketsInZeroBucket ( ) {
// c is a histogram holding the Kahan compensation term.
func ( h * FloatHistogram ) trimBucketsInZeroBucket ( c * FloatHistogram ) {
i := h . PositiveBucketIterator ( )
bucketsIdx := 0
for i . Next ( ) {
@ -917,6 +1067,9 @@ func (h *FloatHistogram) trimBucketsInZeroBucket() {
break
}
h . PositiveBuckets [ bucketsIdx ] = 0
if c != nil {
c . PositiveBuckets [ bucketsIdx ] = 0
}
bucketsIdx ++
}
i = h . NegativeBucketIterator ( )
@ -927,34 +1080,46 @@ func (h *FloatHistogram) trimBucketsInZeroBucket() {
break
}
h . NegativeBuckets [ bucketsIdx ] = 0
if c != nil {
c . NegativeBuckets [ bucketsIdx ] = 0
}
bucketsIdx ++
}
// We are abusing Compact to trim the buckets set to zero
// above. Premature compacting could cause additional cost, but this
// code path is probably rarely used anyway.
h . Compact ( 0 )
if c != nil {
h . kahanCompact ( 0 , c )
} else {
h . Compact ( 0 )
}
}
// reconcileZeroBuckets finds a zero bucket large enough to include the zero
// buckets of both histograms (the receiving histogram and the other histogram)
// with a zero threshold that is not within a populated bucket in either
// histogram. This method modifies the receiving histogram accordingly, but
// leaves the other histogram as is. Instead, it returns the zero count the
// other histogram would have if it were modified.
func ( h * FloatHistogram ) reconcileZeroBuckets ( other * FloatHistogram ) float64 {
otherZeroCount := other . ZeroCount
// histogram. This method modifies the receiving histogram accordingly, and
// also modifies the compensation histogram `c` (used for Kahan summation) if provided,
// but leaves the other histogram as is. Instead, it returns the zero count the
// other histogram would have if it were modified, as well as its Kahan compensation term.
func ( h * FloatHistogram ) reconcileZeroBuckets ( other , c * FloatHistogram ) ( otherZeroCount , otherCZeroCount float64 ) {
otherZeroCount = other . ZeroCount
otherZeroThreshold := other . ZeroThreshold
for otherZeroThreshold != h . ZeroThreshold {
if h . ZeroThreshold > otherZeroThreshold {
otherZeroCount , otherZeroThreshold = other . zeroCountForLargerThreshold ( h . ZeroThreshold )
otherZeroCount , otherZeroThreshold , otherCZeroCount = other . zeroCountForLargerThreshold ( h . ZeroThreshold , nil )
}
if otherZeroThreshold > h . ZeroThreshold {
h . ZeroCount , h . ZeroThreshold = h . zeroCountForLargerThreshold ( otherZeroThreshold )
h . trimBucketsInZeroBucket ( )
var cZeroCount float64
h . ZeroCount , h . ZeroThreshold , cZeroCount = h . zeroCountForLargerThreshold ( otherZeroThreshold , c )
if c != nil {
c . ZeroCount = cZeroCount
}
h . trimBucketsInZeroBucket ( c )
}
}
return otherZeroCount
return otherZeroCount , otherCZeroCount
}
// floatBucketIterator is a low-level constructor for bucket iterators.
@ -1369,6 +1534,145 @@ func addBuckets(
return spansA , bucketsA
}
// kahanAddBuckets works like addBuckets but it is used in FloatHistogram's KahanAdd method
// and takes additional arguments, compensationBucketsA and compensationBucketsB,
// which hold the Kahan compensation values associated with histograms A and B.
// It returns the resulting spans/buckets and compensation buckets.
func kahanAddBuckets (
schema int32 , threshold float64 , negative bool ,
spansA [ ] Span , bucketsA [ ] float64 ,
spansB [ ] Span , bucketsB [ ] float64 ,
compensationBucketsA , compensationBucketsB [ ] float64 ,
) ( newSpans [ ] Span , newBucketsA , newBucketsC [ ] float64 ) {
var (
iSpan = - 1
iBucket = - 1
iInSpan int32
indexA int32
indexB int32
bIdxB int
bucketB float64
compensationBucketB float64
deltaIndex int32
lowerThanThreshold = true
)
for _ , spanB := range spansB {
indexB += spanB . Offset
for j := 0 ; j < int ( spanB . Length ) ; j ++ {
if lowerThanThreshold && IsExponentialSchema ( schema ) && getBoundExponential ( indexB , schema ) <= threshold {
goto nextLoop
}
lowerThanThreshold = false
bucketB = bucketsB [ bIdxB ]
if compensationBucketsB != nil {
compensationBucketB = compensationBucketsB [ bIdxB ]
}
if negative {
bucketB *= - 1
compensationBucketB *= - 1
}
if iSpan == - 1 {
if len ( spansA ) == 0 || spansA [ 0 ] . Offset > indexB {
// Add bucket before all others.
bucketsA = append ( bucketsA , 0 )
copy ( bucketsA [ 1 : ] , bucketsA )
bucketsA [ 0 ] = bucketB
compensationBucketsA = append ( compensationBucketsA , 0 )
copy ( compensationBucketsA [ 1 : ] , compensationBucketsA )
compensationBucketsA [ 0 ] = compensationBucketB
if len ( spansA ) > 0 && spansA [ 0 ] . Offset == indexB + 1 {
spansA [ 0 ] . Length ++
spansA [ 0 ] . Offset --
goto nextLoop
}
spansA = append ( spansA , Span { } )
copy ( spansA [ 1 : ] , spansA )
spansA [ 0 ] = Span { Offset : indexB , Length : 1 }
if len ( spansA ) > 1 {
// Convert the absolute offset in the formerly
// first span to a relative offset.
spansA [ 1 ] . Offset -= indexB + 1
}
goto nextLoop
} else if spansA [ 0 ] . Offset == indexB {
// Just add to first bucket.
bucketsA [ 0 ] , compensationBucketsA [ 0 ] = kahansum . Inc ( bucketB , bucketsA [ 0 ] , compensationBucketsA [ 0 ] )
bucketsA [ 0 ] , compensationBucketsA [ 0 ] = kahansum . Inc ( compensationBucketB , bucketsA [ 0 ] , compensationBucketsA [ 0 ] )
goto nextLoop
}
iSpan , iBucket , iInSpan = 0 , 0 , 0
indexA = spansA [ 0 ] . Offset
}
deltaIndex = indexB - indexA
for {
remainingInSpan := int32 ( spansA [ iSpan ] . Length ) - iInSpan
if deltaIndex < remainingInSpan {
// Bucket is in current span.
iBucket += int ( deltaIndex )
iInSpan += deltaIndex
bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] = kahansum . Inc ( bucketB , bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] )
bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] = kahansum . Inc ( compensationBucketB , bucketsA [ iBucket ] , compensationBucketsA [ iBucket ] )
break
}
deltaIndex -= remainingInSpan
iBucket += int ( remainingInSpan )
iSpan ++
if iSpan == len ( spansA ) || deltaIndex < spansA [ iSpan ] . Offset {
// Bucket is in gap behind previous span (or there are no further spans).
bucketsA = append ( bucketsA , 0 )
copy ( bucketsA [ iBucket + 1 : ] , bucketsA [ iBucket : ] )
bucketsA [ iBucket ] = bucketB
compensationBucketsA = append ( compensationBucketsA , 0 )
copy ( compensationBucketsA [ iBucket + 1 : ] , compensationBucketsA [ iBucket : ] )
compensationBucketsA [ iBucket ] = compensationBucketB
switch {
case deltaIndex == 0 :
// Directly after previous span, extend previous span.
if iSpan < len ( spansA ) {
spansA [ iSpan ] . Offset --
}
iSpan --
iInSpan = int32 ( spansA [ iSpan ] . Length )
spansA [ iSpan ] . Length ++
goto nextLoop
case iSpan < len ( spansA ) && deltaIndex == spansA [ iSpan ] . Offset - 1 :
// Directly before next span, extend next span.
iInSpan = 0
spansA [ iSpan ] . Offset --
spansA [ iSpan ] . Length ++
goto nextLoop
default :
// No next span, or next span is not directly adjacent to new bucket.
// Add new span.
iInSpan = 0
if iSpan < len ( spansA ) {
spansA [ iSpan ] . Offset -= deltaIndex + 1
}
spansA = append ( spansA , Span { } )
copy ( spansA [ iSpan + 1 : ] , spansA [ iSpan : ] )
spansA [ iSpan ] = Span { Length : 1 , Offset : deltaIndex }
goto nextLoop
}
} else {
// Try start of next span.
deltaIndex -= spansA [ iSpan ] . Offset
iInSpan = 0
}
}
nextLoop :
indexA = indexB
indexB ++
bIdxB ++
}
}
return spansA , bucketsA , compensationBucketsA
}
// floatBucketsMatch compares bucket values of two float histograms using binary float comparison
// and returns true if all values match.
func floatBucketsMatch ( b1 , b2 [ ] float64 ) bool {
@ -1496,15 +1800,18 @@ func intersectCustomBucketBounds(boundsA, boundsB []float64) []float64 {
// addCustomBucketsWithMismatches handles adding/subtracting custom bucket histograms
// with mismatched bucket layouts by mapping both to an intersected layout.
// It also processes the Kahan compensation term if provided.
func addCustomBucketsWithMismatches (
negative bool ,
spansA [ ] Span , bucketsA , boundsA [ ] float64 ,
spansB [ ] Span , bucketsB , boundsB [ ] float64 ,
bucketsC [ ] float64 ,
intersectedBounds [ ] float64 ,
) ( [ ] Span , [ ] float64 ) {
) ( [ ] Span , [ ] float64 , [ ] float64 ) {
targetBuckets := make ( [ ] float64 , len ( intersectedBounds ) + 1 )
cTargetBuckets := make ( [ ] float64 , len ( intersectedBounds ) + 1 )
mapBuckets := func ( spans [ ] Span , buckets , bounds [ ] float64 , negative bool ) {
mapBuckets := func ( spans [ ] Span , buckets , bounds [ ] float64 , negative , withCompensation bool ) {
srcIdx := 0
bucketIdx := 0
intersectIdx := 0
@ -1530,9 +1837,12 @@ func addCustomBucketsWithMismatches(
}
if negative {
targetBuckets [ targetIdx ] -= value
targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] = kahansum . Dec ( value , targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] )
} else {
targetBuckets [ targetIdx ] += value
targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] = kahansum . Inc ( value , targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] )
if withCompensation && bucketsC != nil {
targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] = kahansum . Inc ( bucketsC [ bucketIdx ] , targetBuckets [ targetIdx ] , cTargetBuckets [ targetIdx ] )
}
}
}
srcIdx ++
@ -1541,21 +1851,23 @@ func addCustomBucketsWithMismatches(
}
}
// Map both histograms to the intersected layout.
mapBuckets ( spansA , bucketsA , boundsA , false )
mapBuckets ( spansB , bucketsB , boundsB , negative )
// Map histograms to the intersected layout.
mapBuckets ( spansA , bucketsA , boundsA , false , true )
mapBuckets ( spansB , bucketsB , boundsB , negative , false )
// Build spans and buckets, excluding zero-valued buckets from the final result.
destSpans := spansA [ : 0 ] // Reuse spansA capacity for destSpans since we don't need it anymore.
destBuckets := targetBuckets [ : 0 ] // Reuse targetBuckets capacity for destBuckets since it's guaranteed to be large enough.
destSpans := spansA [ : 0 ] // Reuse spansA capacity for destSpans since we don't need it anymore.
destBuckets := targetBuckets [ : 0 ] // Reuse targetBuckets capacity for destBuckets since it's guaranteed to be large enough.
cDestBuckets := cTargetBuckets [ : 0 ] // Reuse cTargetBuckets capacity for cDestBuckets since it's guaranteed to be large enough.
lastIdx := int32 ( - 1 )
for i , count := range targetBuckets {
if count == 0 {
for i := range targetBuckets {
if targetBuckets [ i ] == 0 && cTargetBuckets [ i ] == 0 {
continue
}
destBuckets = append ( destBuckets , count )
destBuckets = append ( destBuckets , targetBuckets [ i ] )
cDestBuckets = append ( cDestBuckets , cTargetBuckets [ i ] )
idx := int32 ( i )
if len ( destSpans ) > 0 && idx == lastIdx + 1 {
@ -1578,7 +1890,7 @@ func addCustomBucketsWithMismatches(
lastIdx = idx
}
return destSpans , destBuckets
return destSpans , destBuckets , cDestBuckets
}
// ReduceResolution reduces the float histogram's spans, buckets into target schema.
@ -1618,6 +1930,121 @@ func (h *FloatHistogram) ReduceResolution(targetSchema int32) error {
return nil
}
// kahanReduceResolution works like reduceResolution, but it is specialized for FloatHistogram's KahanAdd method.
// Unlike reduceResolution, which supports both float and integer buckets, this function only operates on float buckets.
// It also takes an additional argument, originCompensationBuckets, representing the compensation buckets for the origin histogram.
// Modifies both the origin histogram buckets and their associated compensation buckets.
func kahanReduceResolution (
originSpans [ ] Span ,
originReceivingBuckets [ ] float64 ,
originCompensationBuckets [ ] float64 ,
originSchema ,
targetSchema int32 ,
inplace bool ,
) ( newSpans [ ] Span , newReceivingBuckets , newCompensationBuckets [ ] float64 ) {
var (
targetSpans [ ] Span // The spans in the target schema.
targetReceivingBuckets [ ] float64 // The receiving bucket counts in the target schema.
targetCompensationBuckets [ ] float64 // The compensation bucket counts in the target schema.
bucketIdx int32 // The index of bucket in the origin schema.
bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`.
targetBucketIdx int32 // The index of bucket in the target schema.
lastTargetBucketIdx int32 // The index of the last added target bucket.
)
if inplace {
// Slice reuse is safe because when reducing the resolution,
// target slices don't grow faster than origin slices are being read.
targetSpans = originSpans [ : 0 ]
targetReceivingBuckets = originReceivingBuckets [ : 0 ]
targetCompensationBuckets = originCompensationBuckets [ : 0 ]
}
for _ , span := range originSpans {
// Determine the index of the first bucket in this span.
bucketIdx += span . Offset
for j := 0 ; j < int ( span . Length ) ; j ++ {
// Determine the index of the bucket in the target schema from the index in the original schema.
targetBucketIdx = targetIdx ( bucketIdx , originSchema , targetSchema )
switch {
case len ( targetSpans ) == 0 :
// This is the first span in the targetSpans.
span := Span {
Offset : targetBucketIdx ,
Length : 1 ,
}
targetSpans = append ( targetSpans , span )
targetReceivingBuckets = append ( targetReceivingBuckets , originReceivingBuckets [ bucketCountIdx ] )
lastTargetBucketIdx = targetBucketIdx
targetCompensationBuckets = append ( targetCompensationBuckets , originCompensationBuckets [ bucketCountIdx ] )
case lastTargetBucketIdx == targetBucketIdx :
// The current bucket has to be merged into the same target bucket as the previous bucket.
lastBucketIdx := len ( targetReceivingBuckets ) - 1
targetReceivingBuckets [ lastBucketIdx ] , targetCompensationBuckets [ lastBucketIdx ] = kahansum . Inc (
originReceivingBuckets [ bucketCountIdx ] ,
targetReceivingBuckets [ lastBucketIdx ] ,
targetCompensationBuckets [ lastBucketIdx ] ,
)
targetReceivingBuckets [ lastBucketIdx ] , targetCompensationBuckets [ lastBucketIdx ] = kahansum . Inc (
originCompensationBuckets [ bucketCountIdx ] ,
targetReceivingBuckets [ lastBucketIdx ] ,
targetCompensationBuckets [ lastBucketIdx ] ,
)
case ( lastTargetBucketIdx + 1 ) == targetBucketIdx :
// The current bucket has to go into a new target bucket,
// and that bucket is next to the previous target bucket,
// so we add it to the current target span.
targetSpans [ len ( targetSpans ) - 1 ] . Length ++
lastTargetBucketIdx ++
targetReceivingBuckets = append ( targetReceivingBuckets , originReceivingBuckets [ bucketCountIdx ] )
targetCompensationBuckets = append ( targetCompensationBuckets , originCompensationBuckets [ bucketCountIdx ] )
case ( lastTargetBucketIdx + 1 ) < targetBucketIdx :
// The current bucket has to go into a new target bucket,
// and that bucket is separated by a gap from the previous target bucket,
// so we need to add a new target span.
span := Span {
Offset : targetBucketIdx - lastTargetBucketIdx - 1 ,
Length : 1 ,
}
targetSpans = append ( targetSpans , span )
lastTargetBucketIdx = targetBucketIdx
targetReceivingBuckets = append ( targetReceivingBuckets , originReceivingBuckets [ bucketCountIdx ] )
targetCompensationBuckets = append ( targetCompensationBuckets , originCompensationBuckets [ bucketCountIdx ] )
}
bucketIdx ++
bucketCountIdx ++
}
}
return targetSpans , targetReceivingBuckets , targetCompensationBuckets
}
// newCompensationHistogram initializes a new compensation histogram that can be used
// alongside the current FloatHistogram in Kahan summation.
// The compensation histogram is structured to match the receiving histogram's bucket layout
// including its schema, zero threshold and custom values, and it shares spans with the receiving
// histogram. However, the bucket values in the compensation histogram are initialized to zero.
func ( h * FloatHistogram ) newCompensationHistogram ( ) * FloatHistogram {
c := & FloatHistogram {
CounterResetHint : h . CounterResetHint ,
Schema : h . Schema ,
ZeroThreshold : h . ZeroThreshold ,
CustomValues : h . CustomValues ,
PositiveBuckets : make ( [ ] float64 , len ( h . PositiveBuckets ) ) ,
PositiveSpans : h . PositiveSpans ,
NegativeSpans : h . NegativeSpans ,
}
if ! h . UsesCustomBuckets ( ) {
c . NegativeBuckets = make ( [ ] float64 , len ( h . NegativeBuckets ) )
}
return c
}
// checkSchemaAndBounds checks if two histograms are compatible because they
// both use a standard exponential schema or because they both are NHCBs.
func ( h * FloatHistogram ) checkSchemaAndBounds ( other * FloatHistogram ) error {
@ -1659,3 +2086,27 @@ func (h *FloatHistogram) adjustCounterReset(other *FloatHistogram) (counterReset
}
return false
}
// HasOverflow reports whether any of the FloatHistogram's fields contain an infinite value.
// This can happen when aggregating multiple histograms and exceeding float64 capacity.
func ( h * FloatHistogram ) HasOverflow ( ) bool {
if math . IsInf ( h . ZeroCount , 0 ) || math . IsInf ( h . Count , 0 ) || math . IsInf ( h . Sum , 0 ) {
return true
}
for _ , v := range h . PositiveBuckets {
if math . IsInf ( v , 0 ) {
return true
}
}
for _ , v := range h . NegativeBuckets {
if math . IsInf ( v , 0 ) {
return true
}
}
for _ , v := range h . CustomValues {
if math . IsInf ( v , 0 ) {
return true
}
}
return false
}