@ -19,7 +19,10 @@ import (
"github.com/prometheus/prometheus/model/histogram"
)
func writeHistogramChunkLayout ( b * bstream , schema int32 , zeroThreshold float64 , positiveSpans , negativeSpans [ ] histogram . Span ) {
func writeHistogramChunkLayout (
b * bstream , schema int32 , zeroThreshold float64 ,
positiveSpans , negativeSpans [ ] histogram . Span ,
) {
putZeroThreshold ( b , zeroThreshold )
putVarbitInt ( b , int64 ( schema ) )
putHistogramChunkLayoutSpans ( b , positiveSpans )
@ -91,9 +94,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
// putZeroThreshold writes the zero threshold to the bstream. It stores typical
// values in just one byte, but needs 9 bytes for other values. In detail:
//
// * If the threshold is 0, store a single zero byte.
//
// - If the threshold is 0, store a single zero byte.
// - If the threshold is a power of 2 between (and including) 2^-243 and 2^10,
// take the exponent from the IEEE 754 representation of the threshold, which
// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242
@ -103,7 +104,6 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
// threshold. The default value for the zero threshold is 2^-128 (or
// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a
// single byte (with value 116).
//
// - In all other cases, store 255 as a single byte, followed by the 8 bytes of
// the threshold as a float64, i.e. taking 9 bytes in total.
func putZeroThreshold ( b * bstream , threshold float64 ) {
@ -186,16 +186,16 @@ func (b *bucketIterator) Next() (int, bool) {
return 0 , false
}
// An Interjection describes how many new buckets have to be introduc ed before
// processing the pos'th delta from the original slice.
type Interjection struct {
// An Insert describes how many new buckets have to be insert ed before
// processing the pos'th bucket from the original slice.
type Insert struct {
pos int
num int
}
// forwardCompareSpans returns the interjections to convert a slice of deltas to a new
// slice representing an expanded set of buckets, or false if incompatible
// (e.g. if buckets were removed) .
// expandSpansForward returns the inserts to expand the bucket spans 'a' so that
// they match the spans in 'b'. 'b' must cover the same or more buckets than
// 'a', otherwise the function will return false .
//
// Example:
//
@ -222,25 +222,25 @@ type Interjection struct {
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
// delta mods: / \ / \ / \
//
// Note that whenever any new buckets are introduced, the subsequent "old"
// bucket needs to readjust its delta to the new base of 0. Thus, for the caller
// who wants to transform the set of original deltas to a new set of deltas to
// match a new span layout that adds buckets, we simply need to generate a list
// of interjection s.
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
// introduced, the subsequent "old" bucket needs to readjust its delta to the
// new base of 0. Thus, for the caller who wants to transform the set of
// original deltas to a new set of deltas to match a new span layout that adds
// buckets, we simply need to generate a list of insert s.
//
// Note: Within forwardCompareSpans we don't have to worry about the changes to the
// Note: Within expandSpansForward we don't have to worry about the changes to the
// spans themselves, thanks to the iterators we get to work with the more useful
// bucket indices (which of course directly correspond to the buckets we have to
// adjust).
func forwardCompareSpans ( a , b [ ] histogram . Span ) ( forward [ ] Interjection , ok bool ) {
func expandSpansForward ( a , b [ ] histogram . Span ) ( forward [ ] Insert , ok bool ) {
ai := newBucketIterator ( a )
bi := newBucketIterator ( b )
var interjections [ ] Interjection
var inserts [ ] Insert
// When inter.num becomes > 0, this becomes a valid interjection that
// should be yielded when we finish a streak of new buckets.
var inter Interjection
// When inter.num becomes > 0, this becomes a valid insert that should
// be yielded when we finish a streak of new buckets.
var inter Insert
av , aOK := ai . Next ( )
bv , bOK := bi . Next ( )
@ -250,43 +250,46 @@ loop:
case aOK && bOK :
switch {
case av == bv : // Both have an identical value. move on!
// Finish WIP interjection and reset.
// Finish WIP insert and reset.
if inter . num > 0 {
interjection s = append ( interjection s , inter )
insert s = append ( insert s , inter )
}
inter . num = 0
av , aOK = ai . Next ( )
bv , bOK = bi . Next ( )
inter . pos ++
case av < bv : // b misses a value that is in a.
return interjection s , false
return insert s , false
case av > bv : // a misses a value that is in b. Forward b and recompare.
inter . num ++
bv , bOK = bi . Next ( )
}
case aOK && ! bOK : // b misses a value that is in a.
return interjection s , false
return insert s , false
case ! aOK && bOK : // a misses a value that is in b. Forward b and recompare.
inter . num ++
bv , bOK = bi . Next ( )
default : // Both iterators ran out. We're done.
if inter . num > 0 {
interjection s = append ( interjection s , inter )
insert s = append ( insert s , inter )
}
break loop
}
}
return interjection s , true
return insert s , true
}
// bidirectionalCompareSpans does everything that forwardCompareSpans does and
// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a).
func bidirectionalCompareSpans ( a , b [ ] histogram . Span ) ( forward , backward [ ] Interjection , mergedSpans [ ] histogram . Span ) {
// expandSpansBothWays is similar to expandSpansForward, but now b may also
// cover an entirely different set of buckets. The function returns the
// “forward” inserts to expand 'a' to also cover all the buckets exclusively
// covered by 'b', and it returns the “backward” inserts to expand 'b' to also
// cover all the buckets exclusively covered by 'a'
func expandSpansBothWays ( a , b [ ] histogram . Span ) ( forward , backward [ ] Insert , mergedSpans [ ] histogram . Span ) {
ai := newBucketIterator ( a )
bi := newBucketIterator ( b )
var interjections , bInterjections [ ] Interjection
var fInserts , bInserts [ ] Insert
var lastBucket int
addBucket := func ( b int ) {
offset := b - lastBucket - 1
@ -305,9 +308,10 @@ func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Inter
lastBucket = b
}
// When inter.num becomes > 0, this becomes a valid interjection that
// should be yielded when we finish a streak of new buckets.
var inter , bInter Interjection
// When fInter.num (or bInter.num, respectively) becomes > 0, this
// becomes a valid insert that should be yielded when we finish a streak
// of new buckets.
var fInter , bInter Insert
av , aOK := ai . Next ( )
bv , bOK := bi . Next ( )
@ -317,37 +321,37 @@ loop:
case aOK && bOK :
switch {
case av == bv : // Both have an identical value. move on!
// Finish WIP interjection and reset.
if i nter. num > 0 {
interjection s = append ( interjections , i nter)
i nter. num = 0
// Finish WIP insert and reset.
if fI nter. num > 0 {
fInsert s = append ( fInserts , fI nter)
fI nter. num = 0
}
if bInter . num > 0 {
bInterjection s = append ( bInterjection s , bInter )
bInsert s = append ( bInsert s , bInter )
bInter . num = 0
}
addBucket ( av )
av , aOK = ai . Next ( )
bv , bOK = bi . Next ( )
i nter. pos ++
fI nter. pos ++
bInter . pos ++
case av < bv : // b misses a value that is in a.
bInter . num ++
// Collect the forward interjection before advancing the
// position of 'a'.
if i nter. num > 0 {
interjection s = append ( interjections , i nter)
i nter. num = 0
// Collect the forward inserts before advancing
// the position of 'a'.
if fI nter. num > 0 {
fInsert s = append ( fInserts , fI nter)
fI nter. num = 0
}
addBucket ( av )
i nter. pos ++
fI nter. pos ++
av , aOK = ai . Next ( )
case av > bv : // a misses a value that is in b. Forward b and recompare.
i nter. num ++
// Collect the backward interjection before advancing the
fI nter. num ++
// Collect the backward inserts before advancing the
// position of 'b'.
if bInter . num > 0 {
bInterjection s = append ( bInterjection s , bInter )
bInsert s = append ( bInsert s , bInter )
bInter . num = 0
}
addBucket ( bv )
@ -359,92 +363,92 @@ loop:
addBucket ( av )
av , aOK = ai . Next ( )
case ! aOK && bOK : // a misses a value that is in b. Forward b and recompare.
i nter. num ++
fI nter. num ++
addBucket ( bv )
bv , bOK = bi . Next ( )
default : // Both iterators ran out. We're done.
if i nter. num > 0 {
interjection s = append ( interjections , i nter)
if fI nter. num > 0 {
fInsert s = append ( fInserts , fI nter)
}
if bInter . num > 0 {
bInterjection s = append ( bInterjection s , bInter )
bInsert s = append ( bInsert s , bInter )
}
break loop
}
}
return interjections , bInterjection s, mergedSpans
return fInserts , bInsert s, mergedSpans
}
type bucketValue interface {
int64 | float64
}
// interject merges 'in' with the provided interjections and writes them into
// 'out', which must already have the appropriate length.
func interject [ BV bucketValue ] ( in , out [ ] BV , interjections [ ] Interjection , deltas bool ) [ ] BV {
// insert merges 'in' with the provided inserts and writes them into 'out',
// which must already have the appropriate length. 'out' is also returned for
// convenience.
func insert [ BV bucketValue ] ( in , out [ ] BV , inserts [ ] Insert , deltas bool ) [ ] BV {
var (
j int // Position in out.
v BV // The last value seen.
interj int // The next interjection to process.
oi int // Position in out.
v BV // The last value seen.
ii int // The next insert to process.
)
for i , d := range in {
if interj < len ( interjections ) && i == interjections [ interj ] . pos {
// We have an interjection!
// Add interjection.num new delta values such that their bucket values equate 0.
// When deltas==false, it means that it is an absolute value. So we set it to 0 directly.
if ii < len ( inserts ) && i == inserts [ ii ] . pos {
// We have an insert!
// Add insert.num new delta values such that their
// bucket values equate 0. When deltas==false, it means
// that it is an absolute value. So we set it to 0
// directly.
if deltas {
out [ j ] = - v
out [ oi ] = - v
} else {
out [ j ] = 0
out [ oi ] = 0
}
j ++
for x := 1 ; x < interjections [ interj ] . num ; x ++ {
out [ j ] = 0
j ++
oi ++
for x := 1 ; x < inserts [ ii ] . num ; x ++ {
out [ oi ] = 0
oi ++
}
interj ++
ii ++
// Now save the value from the input. The delta value we
// should save is the original delta value + the last
// value of the point before the interjection (to undo
// the delta that was introduced by the interjection).
// When deltas==false, it means that it is an absolute value,
// value of the point before the insert (to undo the
// delta that was introduced by the insert). When
// deltas==false, it means that it is an absolute value,
// so we set it directly to the value in the 'in' slice.
if deltas {
out [ j ] = d + v
out [ oi ] = d + v
} else {
out [ j ] = d
out [ oi ] = d
}
j ++
oi ++
v = d + v
continue
}
// If there was no interjection, the original delta is still
// valid.
out [ j ] = d
j ++
// If there was no insert, the original delta is still valid.
out [ oi ] = d
oi ++
v += d
}
switch interj {
case len ( interjection s ) :
// All interjection s processed. Nothing more to do.
case len ( interjection s ) - 1 :
// One more interjection to process at the end.
switch ii {
case len ( insert s ) :
// All insert s processed. Nothing more to do.
case len ( insert s ) - 1 :
// One more insert to process at the end.
if deltas {
out [ j ] = - v
out [ oi ] = - v
} else {
out [ j ] = 0
out [ oi ] = 0
}
j ++
for x := 1 ; x < interjections [ interj ] . num ; x ++ {
out [ j ] = 0
j ++
oi ++
for x := 1 ; x < inserts [ ii ] . num ; x ++ {
out [ oi ] = 0
oi ++
}
default :
panic ( "unprocessed interjection s left" )
panic ( "unprocessed insert s left" )
}
return out
}