A lot of this code was hacked together, literally during a hackathon. This commit intends not to change the code substantially, but just make the code obey the usual style practices. A (possibly incomplete) list of areas: * Generally address linter warnings. * The `pgk` directory is deprecated as per dev-summit. No new packages should be added to it. I moved the new `pkg/histogram` package to `model` anticipating what's proposed in #9478. * Make the naming of the Sparse Histogram more consistent. Including abbreviations, there were just too many names for it: SparseHistogram, Histogram, Histo, hist, his, shs, h. The idea is to call it "Histogram" in general. Only add "Sparse" if it is needed to avoid confusion with conventional Histograms (which is rare because the TSDB really has no notion of conventional Histograms). Use abbreviations only in local scope, and then really abbreviate (not just removing three out of seven letters like in "Histo"). This is in the spirit of https://github.com/golang/go/wiki/CodeReviewComments#variable-names * Several other minor name changes. * A lot of formatting of doc comments. For one, following https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences , but also layout question, anticipating how things will look like when rendered by `godoc` (even where `godoc` doesn't render them right now because they are for unexported types or not a doc comment at all but just a normal code comment - consistency is queen!). * Re-enabled `TestQueryLog` and `TestEndopints` (they pass now, leaving them disabled was presumably an oversight). * Bucket iterator for histogram.Histogram is now created with a method. * HistogramChunk.iterator now allows iterator recycling. (I think @dieterbe only commented it out because he was confused by the question in the comment.) * HistogramAppender.Append panics now because we decided to treat staleness marker differently. Signed-off-by: beorn7 <beorn@grafana.com>pull/9481/head
parent
fd5ea4e0b5
commit
7a8bb8222c
@ -0,0 +1,165 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package histogram |
||||
|
||||
import ( |
||||
"fmt" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestCumulativeBucketIterator(t *testing.T) { |
||||
cases := []struct { |
||||
histogram Histogram |
||||
expectedCumulativeBuckets []Bucket |
||||
}{ |
||||
{ |
||||
histogram: Histogram{ |
||||
Schema: 0, |
||||
PositiveSpans: []Span{ |
||||
{Offset: 0, Length: 2}, |
||||
{Offset: 1, Length: 2}, |
||||
}, |
||||
PositiveBuckets: []int64{1, 1, -1, 0}, |
||||
}, |
||||
expectedCumulativeBuckets: []Bucket{ |
||||
{Upper: 1, Count: 1}, |
||||
{Upper: 2, Count: 3}, |
||||
|
||||
{Upper: 4, Count: 3}, |
||||
|
||||
{Upper: 8, Count: 4}, |
||||
{Upper: 16, Count: 5}, |
||||
}, |
||||
}, |
||||
{ |
||||
histogram: Histogram{ |
||||
Schema: 0, |
||||
PositiveSpans: []Span{ |
||||
{Offset: 0, Length: 5}, |
||||
{Offset: 1, Length: 1}, |
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0}, |
||||
}, |
||||
expectedCumulativeBuckets: []Bucket{ |
||||
{Upper: 1, Count: 1}, |
||||
{Upper: 2, Count: 4}, |
||||
{Upper: 4, Count: 5}, |
||||
{Upper: 8, Count: 7}, |
||||
|
||||
{Upper: 16, Count: 8}, |
||||
|
||||
{Upper: 32, Count: 8}, |
||||
{Upper: 64, Count: 9}, |
||||
}, |
||||
}, |
||||
{ |
||||
histogram: Histogram{ |
||||
Schema: 0, |
||||
PositiveSpans: []Span{ |
||||
{Offset: 0, Length: 7}, |
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, |
||||
}, |
||||
expectedCumulativeBuckets: []Bucket{ |
||||
{Upper: 1, Count: 1}, |
||||
{Upper: 2, Count: 4}, |
||||
{Upper: 4, Count: 5}, |
||||
{Upper: 8, Count: 7}, |
||||
{Upper: 16, Count: 8}, |
||||
{Upper: 32, Count: 9}, |
||||
{Upper: 64, Count: 10}, |
||||
}, |
||||
}, |
||||
{ |
||||
histogram: Histogram{ |
||||
Schema: 3, |
||||
PositiveSpans: []Span{ |
||||
{Offset: -5, Length: 2}, // -5 -4
|
||||
{Offset: 2, Length: 3}, // -1 0 1
|
||||
{Offset: 2, Length: 2}, // 4 5
|
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 3}, |
||||
}, |
||||
expectedCumulativeBuckets: []Bucket{ |
||||
{Upper: 0.6484197773255048, Count: 1}, // -5
|
||||
{Upper: 0.7071067811865475, Count: 4}, // -4
|
||||
|
||||
{Upper: 0.7711054127039704, Count: 4}, // -3
|
||||
{Upper: 0.8408964152537144, Count: 4}, // -2
|
||||
|
||||
{Upper: 0.9170040432046711, Count: 5}, // -1
|
||||
{Upper: 1, Count: 7}, // 1
|
||||
{Upper: 1.0905077326652577, Count: 8}, // 0
|
||||
|
||||
{Upper: 1.189207115002721, Count: 8}, // 1
|
||||
{Upper: 1.2968395546510096, Count: 8}, // 2
|
||||
|
||||
{Upper: 1.414213562373095, Count: 9}, // 3
|
||||
{Upper: 1.5422108254079407, Count: 13}, // 4
|
||||
}, |
||||
}, |
||||
{ |
||||
histogram: Histogram{ |
||||
Schema: -2, |
||||
PositiveSpans: []Span{ |
||||
{Offset: -2, Length: 4}, // -2 -1 0 1
|
||||
{Offset: 2, Length: 2}, // 4 5
|
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0}, |
||||
}, |
||||
expectedCumulativeBuckets: []Bucket{ |
||||
{Upper: 0.00390625, Count: 1}, // -2
|
||||
{Upper: 0.0625, Count: 4}, // -1
|
||||
{Upper: 1, Count: 5}, // 0
|
||||
{Upper: 16, Count: 7}, // 1
|
||||
|
||||
{Upper: 256, Count: 7}, // 2
|
||||
{Upper: 4096, Count: 7}, // 3
|
||||
|
||||
{Upper: 65536, Count: 8}, // 4
|
||||
{Upper: 1048576, Count: 9}, // 5
|
||||
}, |
||||
}, |
||||
{ |
||||
histogram: Histogram{ |
||||
Schema: -1, |
||||
PositiveSpans: []Span{ |
||||
{Offset: -2, Length: 5}, // -2 -1 0 1 2
|
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1}, |
||||
}, |
||||
expectedCumulativeBuckets: []Bucket{ |
||||
{Upper: 0.0625, Count: 1}, // -2
|
||||
{Upper: 0.25, Count: 4}, // -1
|
||||
{Upper: 1, Count: 5}, // 0
|
||||
{Upper: 4, Count: 7}, // 1
|
||||
{Upper: 16, Count: 8}, // 2
|
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for i, c := range cases { |
||||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { |
||||
it := c.histogram.CumulativeBucketIterator() |
||||
actualBuckets := make([]Bucket, 0, len(c.expectedCumulativeBuckets)) |
||||
for it.Next() { |
||||
actualBuckets = append(actualBuckets, it.At()) |
||||
} |
||||
require.NoError(t, it.Err()) |
||||
require.Equal(t, c.expectedCumulativeBuckets, actualBuckets) |
||||
}) |
||||
} |
||||
} |
@ -1,165 +0,0 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package histogram |
||||
|
||||
import ( |
||||
"fmt" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestCumulativeExpandSparseHistogram(t *testing.T) { |
||||
cases := []struct { |
||||
hist SparseHistogram |
||||
expBuckets []Bucket |
||||
}{ |
||||
{ |
||||
hist: SparseHistogram{ |
||||
Schema: 0, |
||||
PositiveSpans: []Span{ |
||||
{Offset: 0, Length: 2}, |
||||
{Offset: 1, Length: 2}, |
||||
}, |
||||
PositiveBuckets: []int64{1, 1, -1, 0}, |
||||
}, |
||||
expBuckets: []Bucket{ |
||||
{Le: 1, Count: 1}, |
||||
{Le: 2, Count: 3}, |
||||
|
||||
{Le: 4, Count: 3}, |
||||
|
||||
{Le: 8, Count: 4}, |
||||
{Le: 16, Count: 5}, |
||||
}, |
||||
}, |
||||
{ |
||||
hist: SparseHistogram{ |
||||
Schema: 0, |
||||
PositiveSpans: []Span{ |
||||
{Offset: 0, Length: 5}, |
||||
{Offset: 1, Length: 1}, |
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0}, |
||||
}, |
||||
expBuckets: []Bucket{ |
||||
{Le: 1, Count: 1}, |
||||
{Le: 2, Count: 4}, |
||||
{Le: 4, Count: 5}, |
||||
{Le: 8, Count: 7}, |
||||
|
||||
{Le: 16, Count: 8}, |
||||
|
||||
{Le: 32, Count: 8}, |
||||
{Le: 64, Count: 9}, |
||||
}, |
||||
}, |
||||
{ |
||||
hist: SparseHistogram{ |
||||
Schema: 0, |
||||
PositiveSpans: []Span{ |
||||
{Offset: 0, Length: 7}, |
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, |
||||
}, |
||||
expBuckets: []Bucket{ |
||||
{Le: 1, Count: 1}, |
||||
{Le: 2, Count: 4}, |
||||
{Le: 4, Count: 5}, |
||||
{Le: 8, Count: 7}, |
||||
{Le: 16, Count: 8}, |
||||
{Le: 32, Count: 9}, |
||||
{Le: 64, Count: 10}, |
||||
}, |
||||
}, |
||||
{ |
||||
hist: SparseHistogram{ |
||||
Schema: 3, |
||||
PositiveSpans: []Span{ |
||||
{Offset: -5, Length: 2}, // -5 -4
|
||||
{Offset: 2, Length: 3}, // -1 0 1
|
||||
{Offset: 2, Length: 2}, // 4 5
|
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 3}, |
||||
}, |
||||
expBuckets: []Bucket{ |
||||
{Le: 0.6484197773255048, Count: 1}, // -5
|
||||
{Le: 0.7071067811865475, Count: 4}, // -4
|
||||
|
||||
{Le: 0.7711054127039704, Count: 4}, // -3
|
||||
{Le: 0.8408964152537144, Count: 4}, // -2
|
||||
|
||||
{Le: 0.9170040432046711, Count: 5}, // -1
|
||||
{Le: 1, Count: 7}, // 1
|
||||
{Le: 1.0905077326652577, Count: 8}, // 0
|
||||
|
||||
{Le: 1.189207115002721, Count: 8}, // 1
|
||||
{Le: 1.2968395546510096, Count: 8}, // 2
|
||||
|
||||
{Le: 1.414213562373095, Count: 9}, // 3
|
||||
{Le: 1.5422108254079407, Count: 13}, // 4
|
||||
}, |
||||
}, |
||||
{ |
||||
hist: SparseHistogram{ |
||||
Schema: -2, |
||||
PositiveSpans: []Span{ |
||||
{Offset: -2, Length: 4}, // -2 -1 0 1
|
||||
{Offset: 2, Length: 2}, // 4 5
|
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0}, |
||||
}, |
||||
expBuckets: []Bucket{ |
||||
{Le: 0.00390625, Count: 1}, // -2
|
||||
{Le: 0.0625, Count: 4}, // -1
|
||||
{Le: 1, Count: 5}, // 0
|
||||
{Le: 16, Count: 7}, // 1
|
||||
|
||||
{Le: 256, Count: 7}, // 2
|
||||
{Le: 4096, Count: 7}, // 3
|
||||
|
||||
{Le: 65536, Count: 8}, // 4
|
||||
{Le: 1048576, Count: 9}, // 5
|
||||
}, |
||||
}, |
||||
{ |
||||
hist: SparseHistogram{ |
||||
Schema: -1, |
||||
PositiveSpans: []Span{ |
||||
{Offset: -2, Length: 5}, // -2 -1 0 1 2
|
||||
}, |
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1}, |
||||
}, |
||||
expBuckets: []Bucket{ |
||||
{Le: 0.0625, Count: 1}, // -2
|
||||
{Le: 0.25, Count: 4}, // -1
|
||||
{Le: 1, Count: 5}, // 0
|
||||
{Le: 4, Count: 7}, // 1
|
||||
{Le: 16, Count: 8}, // 2
|
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for i, c := range cases { |
||||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { |
||||
it := CumulativeExpandSparseHistogram(c.hist) |
||||
actBuckets := make([]Bucket, 0, len(c.expBuckets)) |
||||
for it.Next() { |
||||
actBuckets = append(actBuckets, it.At()) |
||||
} |
||||
require.NoError(t, it.Err()) |
||||
require.Equal(t, c.expBuckets, actBuckets) |
||||
}) |
||||
} |
||||
} |
@ -1,943 +0,0 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// The code in this file was largely written by Damian Gryski as part of
|
||||
// https://github.com/dgryski/go-tsz and published under the license below.
|
||||
// It was modified to accommodate reading from byte slices without modifying
|
||||
// the underlying bytes, which would panic when reading from mmap'd
|
||||
// read-only byte slices.
|
||||
|
||||
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
|
||||
// All rights reserved.
|
||||
|
||||
// Redistribution and use in source and binary forms, with or without
|
||||
// modification, are permitted provided that the following conditions are met:
|
||||
|
||||
// * Redistributions of source code must retain the above copyright notice,
|
||||
// this list of conditions and the following disclaimer.
|
||||
//
|
||||
// * Redistributions in binary form must reproduce the above copyright notice,
|
||||
// this list of conditions and the following disclaimer in the documentation
|
||||
// and/or other materials provided with the distribution.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"math" |
||||
"math/bits" |
||||
|
||||
"github.com/prometheus/prometheus/pkg/histogram" |
||||
"github.com/prometheus/prometheus/pkg/value" |
||||
) |
||||
|
||||
const () |
||||
|
||||
// HistoChunk holds sparse histogram encoded sample data.
|
||||
// Appends a histogram sample
|
||||
// * schema defines the resolution (number of buckets per power of 2)
|
||||
// Currently, valid numbers are -4 <= n <= 8.
|
||||
// They are all for base-2 bucket schemas, where 1 is a bucket boundary in each case, and
|
||||
// then each power of two is divided into 2^n logarithmic buckets.
|
||||
// Or in other words, each bucket boundary is the previous boundary times 2^(2^-n).
|
||||
// In the future, more bucket schemas may be added using numbers < -4 or > 8.
|
||||
// The bucket with upper boundary of 1 is always bucket 0.
|
||||
// Then negative numbers for smaller boundaries and positive for uppers.
|
||||
//
|
||||
// fields are stored like so:
|
||||
// field ts count zeroCount sum []posbuckets negbuckets
|
||||
// observation 1 raw raw raw raw []raw []raw
|
||||
// observation 2 delta delta delta xor []delta []delta
|
||||
// observation >2 dod dod dod xor []dod []dod
|
||||
// TODO zerothreshold
|
||||
type HistoChunk struct { |
||||
b bstream |
||||
} |
||||
|
||||
// NewHistoChunk returns a new chunk with Histo encoding of the given size.
|
||||
func NewHistoChunk() *HistoChunk { |
||||
b := make([]byte, 3, 128) |
||||
return &HistoChunk{b: bstream{stream: b, count: 0}} |
||||
} |
||||
|
||||
// Encoding returns the encoding type.
|
||||
func (c *HistoChunk) Encoding() Encoding { |
||||
return EncSHS |
||||
} |
||||
|
||||
// Bytes returns the underlying byte slice of the chunk.
|
||||
func (c *HistoChunk) Bytes() []byte { |
||||
return c.b.bytes() |
||||
} |
||||
|
||||
// NumSamples returns the number of samples in the chunk.
|
||||
func (c *HistoChunk) NumSamples() int { |
||||
return int(binary.BigEndian.Uint16(c.Bytes())) |
||||
} |
||||
|
||||
// Meta returns the histogram metadata.
|
||||
// callers may only call this on chunks that have at least one sample
|
||||
func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, error) { |
||||
if c.NumSamples() == 0 { |
||||
panic("HistoChunk.Meta() called on an empty chunk") |
||||
} |
||||
b := newBReader(c.Bytes()[2:]) |
||||
return readHistoChunkMeta(&b) |
||||
} |
||||
|
||||
// CounterResetHeader defines the first 2 bits of the chunk header.
|
||||
type CounterResetHeader byte |
||||
|
||||
const ( |
||||
CounterReset CounterResetHeader = 0b10000000 |
||||
NotCounterReset CounterResetHeader = 0b01000000 |
||||
GaugeType CounterResetHeader = 0b11000000 |
||||
UnknownCounterReset CounterResetHeader = 0b00000000 |
||||
) |
||||
|
||||
// SetCounterResetHeader sets the counter reset header.
|
||||
func (c *HistoChunk) SetCounterResetHeader(h CounterResetHeader) { |
||||
switch h { |
||||
case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset: |
||||
bytes := c.Bytes() |
||||
bytes[2] = (bytes[2] & 0b00111111) | byte(h) |
||||
default: |
||||
panic("invalid CounterResetHeader type") |
||||
} |
||||
} |
||||
|
||||
// GetCounterResetHeader returns the info about the first 2 bits of the chunk header.
|
||||
func (c *HistoChunk) GetCounterResetHeader() CounterResetHeader { |
||||
return CounterResetHeader(c.Bytes()[2] & 0b11000000) |
||||
} |
||||
|
||||
// Compact implements the Chunk interface.
|
||||
func (c *HistoChunk) Compact() { |
||||
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { |
||||
buf := make([]byte, l) |
||||
copy(buf, c.b.stream) |
||||
c.b.stream = buf |
||||
} |
||||
} |
||||
|
||||
// Appender implements the Chunk interface.
|
||||
func (c *HistoChunk) Appender() (Appender, error) { |
||||
it := c.iterator(nil) |
||||
|
||||
// To get an appender we must know the state it would have if we had
|
||||
// appended all existing data from scratch.
|
||||
// We iterate through the end and populate via the iterator's state.
|
||||
for it.Next() { |
||||
} |
||||
if err := it.Err(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
a := &HistoAppender{ |
||||
b: &c.b, |
||||
|
||||
schema: it.schema, |
||||
zeroThreshold: it.zeroThreshold, |
||||
posSpans: it.posSpans, |
||||
negSpans: it.negSpans, |
||||
t: it.t, |
||||
cnt: it.cnt, |
||||
zcnt: it.zcnt, |
||||
tDelta: it.tDelta, |
||||
cntDelta: it.cntDelta, |
||||
zcntDelta: it.zcntDelta, |
||||
posbuckets: it.posbuckets, |
||||
negbuckets: it.negbuckets, |
||||
posbucketsDelta: it.posbucketsDelta, |
||||
negbucketsDelta: it.negbucketsDelta, |
||||
|
||||
sum: it.sum, |
||||
leading: it.leading, |
||||
trailing: it.trailing, |
||||
|
||||
buf64: make([]byte, binary.MaxVarintLen64), |
||||
} |
||||
if binary.BigEndian.Uint16(a.b.bytes()) == 0 { |
||||
a.leading = 0xff |
||||
} |
||||
return a, nil |
||||
} |
||||
|
||||
func countSpans(spans []histogram.Span) int { |
||||
var cnt int |
||||
for _, s := range spans { |
||||
cnt += int(s.Length) |
||||
} |
||||
return cnt |
||||
} |
||||
|
||||
func newHistoIterator(b []byte) *histoIterator { |
||||
it := &histoIterator{ |
||||
br: newBReader(b), |
||||
numTotal: binary.BigEndian.Uint16(b), |
||||
t: math.MinInt64, |
||||
} |
||||
// The first 2 bytes contain chunk headers.
|
||||
// We skip that for actual samples.
|
||||
_, _ = it.br.readBits(16) |
||||
return it |
||||
} |
||||
|
||||
func (c *HistoChunk) iterator(it Iterator) *histoIterator { |
||||
// TODO fix this. this is taken from xor.go // dieter not sure what the purpose of this is
|
||||
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
|
||||
// When using striped locks to guard access to chunks, probably yes.
|
||||
// Could only copy data if the chunk is not completed yet.
|
||||
//if histoIter, ok := it.(*histoIterator); ok {
|
||||
// histoIter.Reset(c.b.bytes())
|
||||
// return histoIter
|
||||
//}
|
||||
return newHistoIterator(c.b.bytes()) |
||||
} |
||||
|
||||
// Iterator implements the Chunk interface.
|
||||
func (c *HistoChunk) Iterator(it Iterator) Iterator { |
||||
return c.iterator(it) |
||||
} |
||||
|
||||
// HistoAppender is an Appender implementation for sparse histograms.
|
||||
type HistoAppender struct { |
||||
b *bstream |
||||
|
||||
// Metadata:
|
||||
schema int32 |
||||
zeroThreshold float64 |
||||
posSpans, negSpans []histogram.Span |
||||
|
||||
// For the fields that are tracked as dod's. Note that we expect to
|
||||
// handle negative deltas (e.g. resets) by creating new chunks, we still
|
||||
// want to support it in general hence signed integer types.
|
||||
t int64 |
||||
cnt, zcnt uint64 |
||||
tDelta, cntDelta, zcntDelta int64 |
||||
|
||||
posbuckets, negbuckets []int64 |
||||
posbucketsDelta, negbucketsDelta []int64 |
||||
|
||||
// The sum is Gorilla xor encoded.
|
||||
sum float64 |
||||
leading uint8 |
||||
trailing uint8 |
||||
|
||||
buf64 []byte // For working on varint64's.
|
||||
} |
||||
|
||||
func putVarint(b *bstream, buf []byte, x int64) { |
||||
for _, byt := range buf[:binary.PutVarint(buf, x)] { |
||||
b.writeByte(byt) |
||||
} |
||||
} |
||||
|
||||
func putUvarint(b *bstream, buf []byte, x uint64) { |
||||
for _, byt := range buf[:binary.PutUvarint(buf, x)] { |
||||
b.writeByte(byt) |
||||
} |
||||
} |
||||
|
||||
// Append implements Appender. This implementation does nothing for now.
|
||||
// TODO(beorn7): Implement in a meaningful way, i.e. we need to support
|
||||
// appending of stale markers, but this should never be used for "real"
|
||||
// samples.
|
||||
func (a *HistoAppender) Append(int64, float64) {} |
||||
|
||||
// Appendable returns whether the chunk can be appended to, and if so
|
||||
// whether any recoding needs to happen using the provided interjections
|
||||
// (in case of any new buckets, positive or negative range, respectively)
|
||||
// The chunk is not appendable if:
|
||||
// * the schema has changed
|
||||
// * the zerobucket threshold has changed
|
||||
// * any buckets disappeared
|
||||
// * there was a counter reset in the count of observations or in any bucket, including the zero bucket
|
||||
// * the last sample in the chunk was stale while the current sample is not stale
|
||||
// It returns an additional boolean set to true if it is not appendable because of a counter reset.
|
||||
// If the given sample is stale, it will always return true.
|
||||
// If counterReset is true, okToAppend MUST be false.
|
||||
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) (posInterjections []Interjection, negInterjections []Interjection, okToAppend bool, counterReset bool) { |
||||
if value.IsStaleNaN(h.Sum) { |
||||
// This is a stale sample whose buckets and spans don't matter.
|
||||
okToAppend = true |
||||
return |
||||
} |
||||
if value.IsStaleNaN(a.sum) { |
||||
// If the last sample was stale, then we can only accept stale samples in this chunk.
|
||||
return |
||||
} |
||||
|
||||
if h.Count < a.cnt { |
||||
// There has been a counter reset.
|
||||
counterReset = true |
||||
return |
||||
} |
||||
|
||||
if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { |
||||
return |
||||
} |
||||
|
||||
if h.ZeroCount < a.zcnt { |
||||
// There has been a counter reset since ZeroThreshold didn't change.
|
||||
counterReset = true |
||||
return |
||||
} |
||||
|
||||
var ok bool |
||||
posInterjections, ok = compareSpans(a.posSpans, h.PositiveSpans) |
||||
if !ok { |
||||
counterReset = true |
||||
return |
||||
} |
||||
negInterjections, ok = compareSpans(a.negSpans, h.NegativeSpans) |
||||
if !ok { |
||||
counterReset = true |
||||
return |
||||
} |
||||
|
||||
if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) || |
||||
counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) { |
||||
counterReset, posInterjections, negInterjections = true, nil, nil |
||||
return |
||||
} |
||||
|
||||
okToAppend = true |
||||
return |
||||
} |
||||
|
||||
// counterResetInAnyBucket returns true if there was a counter reset for any bucket.
|
||||
// This should be called only when buckets are same or new buckets were added,
|
||||
// and does not handle the case of buckets missing.
|
||||
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool { |
||||
if len(oldSpans) == 0 || len(oldBuckets) == 0 { |
||||
return false |
||||
} |
||||
|
||||
oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices.
|
||||
oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span.
|
||||
oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset |
||||
|
||||
oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice.
|
||||
oldVal, newVal := oldBuckets[0], newBuckets[0] |
||||
|
||||
// Since we assume that new spans won't have missing buckets, there will never be a case
|
||||
// where the old index will not find a matching new index.
|
||||
for { |
||||
if oldIdx == newIdx { |
||||
if newVal < oldVal { |
||||
return true |
||||
} |
||||
} |
||||
|
||||
if oldIdx <= newIdx { |
||||
// Moving ahead old bucket and span by 1 index.
|
||||
if oldInsideSpanIdx == oldSpans[oldSpanSliceIdx].Length-1 { |
||||
// Current span is over.
|
||||
oldSpanSliceIdx++ |
||||
oldInsideSpanIdx = 0 |
||||
if oldSpanSliceIdx >= len(oldSpans) { |
||||
// All old spans are over.
|
||||
break |
||||
} |
||||
oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset |
||||
} else { |
||||
oldInsideSpanIdx++ |
||||
oldIdx++ |
||||
} |
||||
oldBucketSliceIdx++ |
||||
oldVal += oldBuckets[oldBucketSliceIdx] |
||||
} |
||||
|
||||
if oldIdx > newIdx { |
||||
// Moving ahead new bucket and span by 1 index.
|
||||
if newInsideSpanIdx == newSpans[newSpanSliceIdx].Length-1 { |
||||
// Current span is over.
|
||||
newSpanSliceIdx++ |
||||
newInsideSpanIdx = 0 |
||||
if newSpanSliceIdx >= len(newSpans) { |
||||
// All new spans are over.
|
||||
// This should not happen, old spans above should catch this first.
|
||||
panic("new spans over before old spans in counterReset") |
||||
} |
||||
newIdx += 1 + newSpans[newSpanSliceIdx].Offset |
||||
} else { |
||||
newInsideSpanIdx++ |
||||
newIdx++ |
||||
} |
||||
newBucketSliceIdx++ |
||||
newVal += newBuckets[newBucketSliceIdx] |
||||
} |
||||
} |
||||
|
||||
return false |
||||
} |
||||
|
||||
// AppendHistogram appends a SparseHistogram to the chunk. We assume the
|
||||
// histogram is properly structured. E.g. that the number of pos/neg buckets
|
||||
// used corresponds to the number conveyed by the pos/neg span structures.
|
||||
// callers must call Appendable() first and act accordingly!
|
||||
func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { |
||||
var tDelta, cntDelta, zcntDelta int64 |
||||
num := binary.BigEndian.Uint16(a.b.bytes()) |
||||
|
||||
if value.IsStaleNaN(h.Sum) { |
||||
// Emptying out other fields to write no buckets, and an empty meta in case of
|
||||
// first histogram in the chunk.
|
||||
h = histogram.SparseHistogram{Sum: h.Sum} |
||||
} |
||||
|
||||
switch num { |
||||
case 0: |
||||
// the first append gets the privilege to dictate the metadata
|
||||
// but it's also responsible for encoding it into the chunk!
|
||||
|
||||
writeHistoChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) |
||||
a.schema = h.Schema |
||||
a.zeroThreshold = h.ZeroThreshold |
||||
a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans |
||||
numPosBuckets, numNegBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) |
||||
a.posbuckets = make([]int64, numPosBuckets) |
||||
a.negbuckets = make([]int64, numNegBuckets) |
||||
a.posbucketsDelta = make([]int64, numPosBuckets) |
||||
a.negbucketsDelta = make([]int64, numNegBuckets) |
||||
|
||||
// now store actual data
|
||||
putVarint(a.b, a.buf64, t) |
||||
putUvarint(a.b, a.buf64, h.Count) |
||||
putUvarint(a.b, a.buf64, h.ZeroCount) |
||||
a.b.writeBits(math.Float64bits(h.Sum), 64) |
||||
for _, buck := range h.PositiveBuckets { |
||||
putVarint(a.b, a.buf64, buck) |
||||
} |
||||
for _, buck := range h.NegativeBuckets { |
||||
putVarint(a.b, a.buf64, buck) |
||||
} |
||||
case 1: |
||||
tDelta = t - a.t |
||||
cntDelta = int64(h.Count) - int64(a.cnt) |
||||
zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) |
||||
|
||||
if value.IsStaleNaN(h.Sum) { |
||||
cntDelta, zcntDelta = 0, 0 |
||||
} |
||||
|
||||
putVarint(a.b, a.buf64, tDelta) |
||||
putVarint(a.b, a.buf64, cntDelta) |
||||
putVarint(a.b, a.buf64, zcntDelta) |
||||
|
||||
a.writeSumDelta(h.Sum) |
||||
|
||||
for i, buck := range h.PositiveBuckets { |
||||
delta := buck - a.posbuckets[i] |
||||
putVarint(a.b, a.buf64, delta) |
||||
a.posbucketsDelta[i] = delta |
||||
} |
||||
for i, buck := range h.NegativeBuckets { |
||||
delta := buck - a.negbuckets[i] |
||||
putVarint(a.b, a.buf64, delta) |
||||
a.negbucketsDelta[i] = delta |
||||
} |
||||
|
||||
default: |
||||
tDelta = t - a.t |
||||
cntDelta = int64(h.Count) - int64(a.cnt) |
||||
zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) |
||||
|
||||
tDod := tDelta - a.tDelta |
||||
cntDod := cntDelta - a.cntDelta |
||||
zcntDod := zcntDelta - a.zcntDelta |
||||
|
||||
if value.IsStaleNaN(h.Sum) { |
||||
cntDod, zcntDod = 0, 0 |
||||
} |
||||
|
||||
putInt64VBBucket(a.b, tDod) |
||||
putInt64VBBucket(a.b, cntDod) |
||||
putInt64VBBucket(a.b, zcntDod) |
||||
|
||||
a.writeSumDelta(h.Sum) |
||||
|
||||
for i, buck := range h.PositiveBuckets { |
||||
delta := buck - a.posbuckets[i] |
||||
dod := delta - a.posbucketsDelta[i] |
||||
putInt64VBBucket(a.b, dod) |
||||
a.posbucketsDelta[i] = delta |
||||
} |
||||
for i, buck := range h.NegativeBuckets { |
||||
delta := buck - a.negbuckets[i] |
||||
dod := delta - a.negbucketsDelta[i] |
||||
putInt64VBBucket(a.b, dod) |
||||
a.negbucketsDelta[i] = delta |
||||
} |
||||
} |
||||
|
||||
binary.BigEndian.PutUint16(a.b.bytes(), num+1) |
||||
|
||||
a.t = t |
||||
a.cnt = h.Count |
||||
a.zcnt = h.ZeroCount |
||||
a.tDelta = tDelta |
||||
a.cntDelta = cntDelta |
||||
a.zcntDelta = zcntDelta |
||||
|
||||
a.posbuckets, a.negbuckets = h.PositiveBuckets, h.NegativeBuckets |
||||
// note that the bucket deltas were already updated above
|
||||
a.sum = h.Sum |
||||
} |
||||
|
||||
// Recode converts the current chunk to accommodate an expansion of the set of
|
||||
// (positive and/or negative) buckets used, according to the provided
|
||||
// interjections, resulting in the honoring of the provided new posSpans and
|
||||
// negSpans.
|
||||
func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection, posSpans, negSpans []histogram.Span) (Chunk, Appender) { |
||||
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||
// it again with the new span layout. This can probably be done in-place
|
||||
// by editing the chunk. But let's first see how expensive it is in the
|
||||
// big picture.
|
||||
byts := a.b.bytes() |
||||
it := newHistoIterator(byts) |
||||
hc := NewHistoChunk() |
||||
app, err := hc.Appender() |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) |
||||
|
||||
for it.Next() { |
||||
tOld, hOld := it.AtHistogram() |
||||
|
||||
// We have to newly allocate slices for the modified buckets
|
||||
// here because they are kept by the appender until the next
|
||||
// append.
|
||||
// TODO(beorn7): We might be able to optimize this.
|
||||
posBuckets := make([]int64, numPosBuckets) |
||||
negBuckets := make([]int64, numNegBuckets) |
||||
|
||||
// Save the modified histogram to the new chunk.
|
||||
hOld.PositiveSpans, hOld.NegativeSpans = posSpans, negSpans |
||||
if len(posInterjections) > 0 { |
||||
hOld.PositiveBuckets = interject(hOld.PositiveBuckets, posBuckets, posInterjections) |
||||
} |
||||
if len(negInterjections) > 0 { |
||||
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negBuckets, negInterjections) |
||||
} |
||||
app.AppendHistogram(tOld, hOld) |
||||
} |
||||
|
||||
// Set the flags.
|
||||
hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000)) |
||||
return hc, app |
||||
} |
||||
|
||||
func (a *HistoAppender) writeSumDelta(v float64) { |
||||
vDelta := math.Float64bits(v) ^ math.Float64bits(a.sum) |
||||
|
||||
if vDelta == 0 { |
||||
a.b.writeBit(zero) |
||||
return |
||||
} |
||||
a.b.writeBit(one) |
||||
|
||||
leading := uint8(bits.LeadingZeros64(vDelta)) |
||||
trailing := uint8(bits.TrailingZeros64(vDelta)) |
||||
|
||||
// Clamp number of leading zeros to avoid overflow when encoding.
|
||||
if leading >= 32 { |
||||
leading = 31 |
||||
} |
||||
|
||||
if a.leading != 0xff && leading >= a.leading && trailing >= a.trailing { |
||||
a.b.writeBit(zero) |
||||
a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) |
||||
} else { |
||||
a.leading, a.trailing = leading, trailing |
||||
|
||||
a.b.writeBit(one) |
||||
a.b.writeBits(uint64(leading), 5) |
||||
|
||||
// Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have.
|
||||
// Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0).
|
||||
// So instead we write out a 0 and adjust it back to 64 on unpacking.
|
||||
sigbits := 64 - leading - trailing |
||||
a.b.writeBits(uint64(sigbits), 6) |
||||
a.b.writeBits(vDelta>>trailing, int(sigbits)) |
||||
} |
||||
} |
||||
|
||||
type histoIterator struct { |
||||
br bstreamReader |
||||
numTotal uint16 |
||||
numRead uint16 |
||||
|
||||
// Metadata:
|
||||
schema int32 |
||||
zeroThreshold float64 |
||||
posSpans, negSpans []histogram.Span |
||||
|
||||
// For the fields that are tracked as dod's.
|
||||
t int64 |
||||
cnt, zcnt uint64 |
||||
tDelta, cntDelta, zcntDelta int64 |
||||
|
||||
posbuckets, negbuckets []int64 |
||||
posbucketsDelta, negbucketsDelta []int64 |
||||
|
||||
// The sum is Gorilla xor encoded.
|
||||
sum float64 |
||||
leading uint8 |
||||
trailing uint8 |
||||
|
||||
err error |
||||
} |
||||
|
||||
func (it *histoIterator) Seek(t int64) bool { |
||||
if it.err != nil { |
||||
return false |
||||
} |
||||
|
||||
for t > it.t || it.numRead == 0 { |
||||
if !it.Next() { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func (it *histoIterator) At() (int64, float64) { |
||||
panic("cannot call histoIterator.At().") |
||||
} |
||||
|
||||
func (it *histoIterator) ChunkEncoding() Encoding { |
||||
return EncSHS |
||||
} |
||||
|
||||
func (it *histoIterator) AtHistogram() (int64, histogram.SparseHistogram) { |
||||
if value.IsStaleNaN(it.sum) { |
||||
return it.t, histogram.SparseHistogram{Sum: it.sum} |
||||
} |
||||
return it.t, histogram.SparseHistogram{ |
||||
Count: it.cnt, |
||||
ZeroCount: it.zcnt, |
||||
Sum: it.sum, |
||||
ZeroThreshold: it.zeroThreshold, |
||||
Schema: it.schema, |
||||
PositiveSpans: it.posSpans, |
||||
NegativeSpans: it.negSpans, |
||||
PositiveBuckets: it.posbuckets, |
||||
NegativeBuckets: it.negbuckets, |
||||
} |
||||
} |
||||
|
||||
func (it *histoIterator) Err() error { |
||||
return it.err |
||||
} |
||||
|
||||
func (it *histoIterator) Reset(b []byte) { |
||||
// The first 2 bytes contain chunk headers.
|
||||
// We skip that for actual samples.
|
||||
it.br = newBReader(b[2:]) |
||||
it.numTotal = binary.BigEndian.Uint16(b) |
||||
it.numRead = 0 |
||||
|
||||
it.t, it.cnt, it.zcnt = 0, 0, 0 |
||||
it.tDelta, it.cntDelta, it.zcntDelta = 0, 0, 0 |
||||
|
||||
for i := range it.posbuckets { |
||||
it.posbuckets[i] = 0 |
||||
it.posbucketsDelta[i] = 0 |
||||
} |
||||
for i := range it.negbuckets { |
||||
it.negbuckets[i] = 0 |
||||
it.negbucketsDelta[i] = 0 |
||||
} |
||||
|
||||
it.sum = 0 |
||||
it.leading = 0 |
||||
it.trailing = 0 |
||||
it.err = nil |
||||
} |
||||
|
||||
func (it *histoIterator) Next() bool { |
||||
if it.err != nil || it.numRead == it.numTotal { |
||||
return false |
||||
} |
||||
|
||||
if it.numRead == 0 { |
||||
|
||||
// first read is responsible for reading chunk metadata and initializing fields that depend on it
|
||||
// We give counter reset info at chunk level, hence we discard it here.
|
||||
schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.schema = schema |
||||
it.zeroThreshold = zeroThreshold |
||||
it.posSpans, it.negSpans = posSpans, negSpans |
||||
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) |
||||
if numPosBuckets > 0 { |
||||
it.posbuckets = make([]int64, numPosBuckets) |
||||
it.posbucketsDelta = make([]int64, numPosBuckets) |
||||
} |
||||
if numNegBuckets > 0 { |
||||
it.negbuckets = make([]int64, numNegBuckets) |
||||
it.negbucketsDelta = make([]int64, numNegBuckets) |
||||
} |
||||
|
||||
// now read actual data
|
||||
|
||||
t, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.t = t |
||||
|
||||
cnt, err := binary.ReadUvarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.cnt = cnt |
||||
|
||||
zcnt, err := binary.ReadUvarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.zcnt = zcnt |
||||
|
||||
sum, err := it.br.readBits(64) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.sum = math.Float64frombits(sum) |
||||
|
||||
for i := range it.posbuckets { |
||||
v, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.posbuckets[i] = v |
||||
} |
||||
for i := range it.negbuckets { |
||||
v, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.negbuckets[i] = v |
||||
} |
||||
|
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
if it.numRead == 1 { |
||||
tDelta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.tDelta = tDelta |
||||
it.t += int64(it.tDelta) |
||||
|
||||
cntDelta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.cntDelta = cntDelta |
||||
it.cnt = uint64(int64(it.cnt) + it.cntDelta) |
||||
|
||||
zcntDelta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.zcntDelta = zcntDelta |
||||
it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta) |
||||
|
||||
ok := it.readSum() |
||||
if !ok { |
||||
return false |
||||
} |
||||
|
||||
if value.IsStaleNaN(it.sum) { |
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
for i := range it.posbuckets { |
||||
delta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.posbucketsDelta[i] = delta |
||||
it.posbuckets[i] = it.posbuckets[i] + delta |
||||
} |
||||
|
||||
for i := range it.negbuckets { |
||||
delta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.negbucketsDelta[i] = delta |
||||
it.negbuckets[i] = it.negbuckets[i] + delta |
||||
} |
||||
|
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
tDod, err := readInt64VBBucket(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.tDelta = it.tDelta + tDod |
||||
it.t += it.tDelta |
||||
|
||||
cntDod, err := readInt64VBBucket(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.cntDelta = it.cntDelta + cntDod |
||||
it.cnt = uint64(int64(it.cnt) + it.cntDelta) |
||||
|
||||
zcntDod, err := readInt64VBBucket(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.zcntDelta = it.zcntDelta + zcntDod |
||||
it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta) |
||||
|
||||
ok := it.readSum() |
||||
if !ok { |
||||
return false |
||||
} |
||||
|
||||
if value.IsStaleNaN(it.sum) { |
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
for i := range it.posbuckets { |
||||
dod, err := readInt64VBBucket(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.posbucketsDelta[i] = it.posbucketsDelta[i] + dod |
||||
it.posbuckets[i] = it.posbuckets[i] + it.posbucketsDelta[i] |
||||
} |
||||
|
||||
for i := range it.negbuckets { |
||||
dod, err := readInt64VBBucket(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.negbucketsDelta[i] = it.negbucketsDelta[i] + dod |
||||
it.negbuckets[i] = it.negbuckets[i] + it.negbucketsDelta[i] |
||||
} |
||||
|
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
func (it *histoIterator) readSum() bool { |
||||
bit, err := it.br.readBitFast() |
||||
if err != nil { |
||||
bit, err = it.br.readBit() |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
|
||||
if bit == zero { |
||||
// it.sum = it.sum
|
||||
} else { |
||||
bit, err := it.br.readBitFast() |
||||
if err != nil { |
||||
bit, err = it.br.readBit() |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
if bit == zero { |
||||
// reuse leading/trailing zero bits
|
||||
// it.leading, it.trailing = it.leading, it.trailing
|
||||
} else { |
||||
bits, err := it.br.readBitsFast(5) |
||||
if err != nil { |
||||
bits, err = it.br.readBits(5) |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.leading = uint8(bits) |
||||
|
||||
bits, err = it.br.readBitsFast(6) |
||||
if err != nil { |
||||
bits, err = it.br.readBits(6) |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
mbits := uint8(bits) |
||||
// 0 significant bits here means we overflowed and we actually need 64; see comment in encoder
|
||||
if mbits == 0 { |
||||
mbits = 64 |
||||
} |
||||
it.trailing = 64 - it.leading - mbits |
||||
} |
||||
|
||||
mbits := 64 - it.leading - it.trailing |
||||
bits, err := it.br.readBitsFast(mbits) |
||||
if err != nil { |
||||
bits, err = it.br.readBits(mbits) |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
vbits := math.Float64bits(it.sum) |
||||
vbits ^= bits << it.trailing |
||||
it.sum = math.Float64frombits(vbits) |
||||
} |
||||
|
||||
return true |
||||
} |
@ -1,261 +0,0 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"github.com/prometheus/prometheus/pkg/histogram" |
||||
) |
||||
|
||||
func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { |
||||
putInt64VBBucket(b, int64(schema)) |
||||
putFloat64VBBucket(b, zeroThreshold) |
||||
putHistoChunkMetaSpans(b, posSpans) |
||||
putHistoChunkMetaSpans(b, negSpans) |
||||
} |
||||
|
||||
func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) { |
||||
putInt64VBBucket(b, int64(len(spans))) |
||||
for _, s := range spans { |
||||
putInt64VBBucket(b, int64(s.Length)) |
||||
putInt64VBBucket(b, int64(s.Offset)) |
||||
} |
||||
} |
||||
|
||||
func readHistoChunkMeta(b *bstreamReader) (schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) { |
||||
_, err = b.ReadByte() // The header.
|
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
v, err := readInt64VBBucket(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
schema = int32(v) |
||||
|
||||
zeroThreshold, err = readFloat64VBBucket(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
posSpans, err = readHistoChunkMetaSpans(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
negSpans, err = readHistoChunkMetaSpans(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
return |
||||
} |
||||
|
||||
func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { |
||||
var spans []histogram.Span |
||||
num, err := readInt64VBBucket(b) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
for i := 0; i < int(num); i++ { |
||||
|
||||
length, err := readInt64VBBucket(b) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
offset, err := readInt64VBBucket(b) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
spans = append(spans, histogram.Span{ |
||||
Length: uint32(length), |
||||
Offset: int32(offset), |
||||
}) |
||||
} |
||||
return spans, nil |
||||
} |
||||
|
||||
type bucketIterator struct { |
||||
spans []histogram.Span |
||||
span int // span position of last yielded bucket
|
||||
bucket int // bucket position within span of last yielded bucket
|
||||
idx int // bucket index (globally across all spans) of last yielded bucket
|
||||
} |
||||
|
||||
func newBucketIterator(spans []histogram.Span) *bucketIterator { |
||||
b := bucketIterator{ |
||||
spans: spans, |
||||
span: 0, |
||||
bucket: -1, |
||||
idx: -1, |
||||
} |
||||
if len(spans) > 0 { |
||||
b.idx += int(spans[0].Offset) |
||||
} |
||||
return &b |
||||
} |
||||
|
||||
func (b *bucketIterator) Next() (int, bool) { |
||||
// we're already out of bounds
|
||||
if b.span >= len(b.spans) { |
||||
return 0, false |
||||
} |
||||
try: |
||||
if b.bucket < int(b.spans[b.span].Length-1) { // try to move within same span.
|
||||
b.bucket++ |
||||
b.idx++ |
||||
return b.idx, true |
||||
} else if b.span < len(b.spans)-1 { // try to move from one span to the next
|
||||
b.span++ |
||||
b.idx += int(b.spans[b.span].Offset + 1) |
||||
b.bucket = 0 |
||||
if b.spans[b.span].Length == 0 { |
||||
// pathological case that should never happen. We can't use this span, let's try again.
|
||||
goto try |
||||
} |
||||
return b.idx, true |
||||
} |
||||
// we're out of options
|
||||
return 0, false |
||||
} |
||||
|
||||
// Interjection describes that num new buckets are introduced before processing the pos'th delta from the original slice
|
||||
type Interjection struct { |
||||
pos int |
||||
num int |
||||
} |
||||
|
||||
// compareSpans returns the interjections to convert a slice of deltas to a new slice representing an expanded set of buckets, or false if incompatible (e.g. if buckets were removed)
|
||||
// For example:
|
||||
// Let's say the old buckets look like this:
|
||||
// span syntax: [offset, length]
|
||||
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
|
||||
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
|
||||
// raw values 6 3 3 2 4 5 1
|
||||
// deltas 6 -3 0 -1 2 1 -4
|
||||
|
||||
// But now we introduce a new bucket layout. (carefully chosen example where we have a span appended, one unchanged[*], one prepended, and two merge - in that order)
|
||||
// [*] unchanged in terms of which bucket indices they represent. but to achieve that, their offset needs to change if "disrupted" by spans changing ahead of them
|
||||
// \/ this one is "unchanged"
|
||||
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
|
||||
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
|
||||
// raw values 6 3 0 3 0 0 2 4 5 0 1
|
||||
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
|
||||
// delta mods: / \ / \ / \
|
||||
// note that whenever any new buckets are introduced, the subsequent "old" bucket needs to readjust its delta to the new base of 0
|
||||
// thus, for the caller, who wants to transform the set of original deltas to a new set of deltas to match a new span layout that adds buckets, we simply
|
||||
// need to generate a list of interjections
|
||||
// note: within compareSpans we don't have to worry about the changes to the spans themselves,
|
||||
// thanks to the iterators, we get to work with the more useful bucket indices (which of course directly correspond to the buckets we have to adjust)
|
||||
func compareSpans(a, b []histogram.Span) ([]Interjection, bool) { |
||||
ai := newBucketIterator(a) |
||||
bi := newBucketIterator(b) |
||||
|
||||
var interjections []Interjection |
||||
|
||||
// when inter.num becomes > 0, this becomes a valid interjection that should be yielded when we finish a streak of new buckets
|
||||
var inter Interjection |
||||
|
||||
av, aok := ai.Next() |
||||
bv, bok := bi.Next() |
||||
loop: |
||||
for { |
||||
switch { |
||||
case aok && bok: |
||||
switch { |
||||
case av == bv: // Both have an identical value. move on!
|
||||
// Finish WIP interjection and reset.
|
||||
if inter.num > 0 { |
||||
interjections = append(interjections, inter) |
||||
} |
||||
inter.num = 0 |
||||
av, aok = ai.Next() |
||||
bv, bok = bi.Next() |
||||
inter.pos++ |
||||
case av < bv: // b misses a value that is in a.
|
||||
return interjections, false |
||||
case av > bv: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++ |
||||
bv, bok = bi.Next() |
||||
} |
||||
case aok && !bok: // b misses a value that is in a.
|
||||
return interjections, false |
||||
case !aok && bok: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++ |
||||
bv, bok = bi.Next() |
||||
default: // Both iterators ran out. We're done.
|
||||
if inter.num > 0 { |
||||
interjections = append(interjections, inter) |
||||
} |
||||
break loop |
||||
} |
||||
} |
||||
|
||||
return interjections, true |
||||
} |
||||
|
||||
// interject merges 'in' with the provided interjections and writes them into
|
||||
// 'out', which must already have the appropriate length.
|
||||
func interject(in, out []int64, interjections []Interjection) []int64 { |
||||
var j int // Position in out.
|
||||
var v int64 // The last value seen.
|
||||
var interj int // The next interjection to process.
|
||||
for i, d := range in { |
||||
if interj < len(interjections) && i == interjections[interj].pos { |
||||
|
||||
// We have an interjection!
|
||||
// Add interjection.num new delta values such that their
|
||||
// bucket values equate 0.
|
||||
out[j] = int64(-v) |
||||
j++ |
||||
for x := 1; x < interjections[interj].num; x++ { |
||||
out[j] = 0 |
||||
j++ |
||||
} |
||||
interj++ |
||||
|
||||
// Now save the value from the input. The delta value we
|
||||
// should save is the original delta value + the last
|
||||
// value of the point before the interjection (to undo
|
||||
// the delta that was introduced by the interjection).
|
||||
out[j] = d + v |
||||
j++ |
||||
v = d + v |
||||
continue |
||||
} |
||||
|
||||
// If there was no interjection, the original delta is still
|
||||
// valid.
|
||||
out[j] = d |
||||
j++ |
||||
v += d |
||||
} |
||||
switch interj { |
||||
case len(interjections): |
||||
// All interjections processed. Nothing more to do.
|
||||
case len(interjections) - 1: |
||||
// One more interjection to process at the end.
|
||||
out[j] = int64(-v) |
||||
j++ |
||||
for x := 1; x < interjections[interj].num; x++ { |
||||
out[j] = 0 |
||||
j++ |
||||
} |
||||
default: |
||||
panic("unprocessed interjections left") |
||||
} |
||||
return out |
||||
} |
@ -0,0 +1,934 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"math" |
||||
"math/bits" |
||||
|
||||
"github.com/prometheus/prometheus/model/histogram" |
||||
"github.com/prometheus/prometheus/pkg/value" |
||||
) |
||||
|
||||
const () |
||||
|
||||
// HistogramChunk holds encoded sample data for a sparse, high-resolution
|
||||
// histogram.
|
||||
//
|
||||
// TODO(beorn7): Document the layout of chunk metadata.
|
||||
//
|
||||
// Each sample has multiple "fields", stored in the following way (raw = store
|
||||
// number directly, delta = store delta to the previous number, dod = store
|
||||
// delta of the delta to the previous number, xor = what we do for regular
|
||||
// sample values):
|
||||
//
|
||||
// field → ts count zeroCount sum []posbuckets []negbuckets
|
||||
// sample 1 raw raw raw raw []raw []raw
|
||||
// sample 2 delta delta delta xor []delta []delta
|
||||
// sample >2 dod dod dod xor []dod []dod
|
||||
type HistogramChunk struct { |
||||
b bstream |
||||
} |
||||
|
||||
// NewHistogramChunk returns a new chunk with histogram encoding of the given
|
||||
// size.
|
||||
func NewHistogramChunk() *HistogramChunk { |
||||
b := make([]byte, 3, 128) |
||||
return &HistogramChunk{b: bstream{stream: b, count: 0}} |
||||
} |
||||
|
||||
// Encoding returns the encoding type.
|
||||
func (c *HistogramChunk) Encoding() Encoding { |
||||
return EncHistogram |
||||
} |
||||
|
||||
// Bytes returns the underlying byte slice of the chunk.
|
||||
func (c *HistogramChunk) Bytes() []byte { |
||||
return c.b.bytes() |
||||
} |
||||
|
||||
// NumSamples returns the number of samples in the chunk.
|
||||
func (c *HistogramChunk) NumSamples() int { |
||||
return int(binary.BigEndian.Uint16(c.Bytes())) |
||||
} |
||||
|
||||
// Meta returns the histogram metadata. Only call this on chunks that have at
|
||||
// least one sample.
|
||||
func (c *HistogramChunk) Meta() ( |
||||
schema int32, zeroThreshold float64, |
||||
negativeSpans, positiveSpans []histogram.Span, |
||||
err error, |
||||
) { |
||||
if c.NumSamples() == 0 { |
||||
panic("HistoChunk.Meta() called on an empty chunk") |
||||
} |
||||
b := newBReader(c.Bytes()[2:]) |
||||
return readHistogramChunkMeta(&b) |
||||
} |
||||
|
||||
// CounterResetHeader defines the first 2 bits of the chunk header.
|
||||
type CounterResetHeader byte |
||||
|
||||
const ( |
||||
CounterReset CounterResetHeader = 0b10000000 |
||||
NotCounterReset CounterResetHeader = 0b01000000 |
||||
GaugeType CounterResetHeader = 0b11000000 |
||||
UnknownCounterReset CounterResetHeader = 0b00000000 |
||||
) |
||||
|
||||
// SetCounterResetHeader sets the counter reset header.
|
||||
func (c *HistogramChunk) SetCounterResetHeader(h CounterResetHeader) { |
||||
switch h { |
||||
case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset: |
||||
bytes := c.Bytes() |
||||
bytes[2] = (bytes[2] & 0b00111111) | byte(h) |
||||
default: |
||||
panic("invalid CounterResetHeader type") |
||||
} |
||||
} |
||||
|
||||
// GetCounterResetHeader returns the info about the first 2 bits of the chunk
|
||||
// header.
|
||||
func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader { |
||||
return CounterResetHeader(c.Bytes()[2] & 0b11000000) |
||||
} |
||||
|
||||
// Compact implements the Chunk interface.
|
||||
func (c *HistogramChunk) Compact() { |
||||
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { |
||||
buf := make([]byte, l) |
||||
copy(buf, c.b.stream) |
||||
c.b.stream = buf |
||||
} |
||||
} |
||||
|
||||
// Appender implements the Chunk interface.
|
||||
func (c *HistogramChunk) Appender() (Appender, error) { |
||||
it := c.iterator(nil) |
||||
|
||||
// To get an appender, we must know the state it would have if we had
|
||||
// appended all existing data from scratch. We iterate through the end
|
||||
// and populate via the iterator's state.
|
||||
for it.Next() { |
||||
} |
||||
if err := it.Err(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
a := &HistogramAppender{ |
||||
b: &c.b, |
||||
|
||||
schema: it.schema, |
||||
zThreshold: it.zThreshold, |
||||
pSpans: it.pSpans, |
||||
nSpans: it.nSpans, |
||||
t: it.t, |
||||
cnt: it.cnt, |
||||
zCnt: it.zCnt, |
||||
tDelta: it.tDelta, |
||||
cntDelta: it.cntDelta, |
||||
zCntDelta: it.zCntDelta, |
||||
pBuckets: it.pBuckets, |
||||
nBuckets: it.nBuckets, |
||||
pBucketsDelta: it.pBucketsDelta, |
||||
nBucketsDelta: it.nBucketsDelta, |
||||
|
||||
sum: it.sum, |
||||
leading: it.leading, |
||||
trailing: it.trailing, |
||||
|
||||
buf64: make([]byte, binary.MaxVarintLen64), |
||||
} |
||||
if binary.BigEndian.Uint16(a.b.bytes()) == 0 { |
||||
a.leading = 0xff |
||||
} |
||||
return a, nil |
||||
} |
||||
|
||||
func countSpans(spans []histogram.Span) int { |
||||
var cnt int |
||||
for _, s := range spans { |
||||
cnt += int(s.Length) |
||||
} |
||||
return cnt |
||||
} |
||||
|
||||
func newHistogramIterator(b []byte) *histogramIterator { |
||||
it := &histogramIterator{ |
||||
br: newBReader(b), |
||||
numTotal: binary.BigEndian.Uint16(b), |
||||
t: math.MinInt64, |
||||
} |
||||
// The first 2 bytes contain chunk headers.
|
||||
// We skip that for actual samples.
|
||||
_, _ = it.br.readBits(16) |
||||
return it |
||||
} |
||||
|
||||
func (c *HistogramChunk) iterator(it Iterator) *histogramIterator { |
||||
// This commet is copied from XORChunk.iterator:
|
||||
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
|
||||
// When using striped locks to guard access to chunks, probably yes.
|
||||
// Could only copy data if the chunk is not completed yet.
|
||||
if histogramIter, ok := it.(*histogramIterator); ok { |
||||
histogramIter.Reset(c.b.bytes()) |
||||
return histogramIter |
||||
} |
||||
return newHistogramIterator(c.b.bytes()) |
||||
} |
||||
|
||||
// Iterator implements the Chunk interface.
|
||||
func (c *HistogramChunk) Iterator(it Iterator) Iterator { |
||||
return c.iterator(it) |
||||
} |
||||
|
||||
// HistogramAppender is an Appender implementation for sparse histograms.
|
||||
type HistogramAppender struct { |
||||
b *bstream |
||||
|
||||
// Metadata:
|
||||
schema int32 |
||||
zThreshold float64 |
||||
pSpans, nSpans []histogram.Span |
||||
|
||||
// Although we intend to start new chunks on counter resets, we still
|
||||
// have to handle negative deltas for gauge histograms. Therefore, even
|
||||
// deltas are signed types here (even for tDelta to not treat that one
|
||||
// specially).
|
||||
t int64 |
||||
cnt, zCnt uint64 |
||||
tDelta, cntDelta, zCntDelta int64 |
||||
pBuckets, nBuckets []int64 |
||||
pBucketsDelta, nBucketsDelta []int64 |
||||
|
||||
// The sum is Gorilla xor encoded.
|
||||
sum float64 |
||||
leading uint8 |
||||
trailing uint8 |
||||
|
||||
buf64 []byte // For working on varint64's.
|
||||
} |
||||
|
||||
func putVarint(b *bstream, buf []byte, x int64) { |
||||
for _, byt := range buf[:binary.PutVarint(buf, x)] { |
||||
b.writeByte(byt) |
||||
} |
||||
} |
||||
|
||||
func putUvarint(b *bstream, buf []byte, x uint64) { |
||||
for _, byt := range buf[:binary.PutUvarint(buf, x)] { |
||||
b.writeByte(byt) |
||||
} |
||||
} |
||||
|
||||
// Append implements Appender. This implementation panics because normal float
|
||||
// samples must never be appended to a histogram chunk.
|
||||
func (a *HistogramAppender) Append(int64, float64) { |
||||
panic("appended a float sample to a histogram chunk") |
||||
} |
||||
|
||||
// Appendable returns whether the chunk can be appended to, and if so
|
||||
// whether any recoding needs to happen using the provided interjections
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
//
|
||||
// The chunk is not appendable in the following cases:
|
||||
//
|
||||
// • The schema has changed.
|
||||
//
|
||||
// • The threshold for the zero bucket has changed.
|
||||
//
|
||||
// • Any buckets have disappeared.
|
||||
//
|
||||
// • There was a counter reset in the count of observations or in any bucket,
|
||||
// including the zero bucket.
|
||||
//
|
||||
// • The last sample in the chunk was stale while the current sample is not stale.
|
||||
//
|
||||
// The method returns an additional boolean set to true if it is not appendable
|
||||
// because of a counter reset. If the given sample is stale, it is always ok to
|
||||
// append. If counterReset is true, okToAppend is always false.
|
||||
func (a *HistogramAppender) Appendable(h histogram.Histogram) ( |
||||
positiveInterjections, negativeInterjections []Interjection, |
||||
okToAppend bool, counterReset bool, |
||||
) { |
||||
if value.IsStaleNaN(h.Sum) { |
||||
// This is a stale sample whose buckets and spans don't matter.
|
||||
okToAppend = true |
||||
return |
||||
} |
||||
if value.IsStaleNaN(a.sum) { |
||||
// If the last sample was stale, then we can only accept stale
|
||||
// samples in this chunk.
|
||||
return |
||||
} |
||||
|
||||
if h.Count < a.cnt { |
||||
// There has been a counter reset.
|
||||
counterReset = true |
||||
return |
||||
} |
||||
|
||||
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold { |
||||
return |
||||
} |
||||
|
||||
if h.ZeroCount < a.zCnt { |
||||
// There has been a counter reset since ZeroThreshold didn't change.
|
||||
counterReset = true |
||||
return |
||||
} |
||||
|
||||
var ok bool |
||||
positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans) |
||||
if !ok { |
||||
counterReset = true |
||||
return |
||||
} |
||||
negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans) |
||||
if !ok { |
||||
counterReset = true |
||||
return |
||||
} |
||||
|
||||
if counterResetInAnyBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || |
||||
counterResetInAnyBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { |
||||
counterReset, positiveInterjections, negativeInterjections = true, nil, nil |
||||
return |
||||
} |
||||
|
||||
okToAppend = true |
||||
return |
||||
} |
||||
|
||||
// counterResetInAnyBucket returns true if there was a counter reset for any
|
||||
// bucket. This should be called only when the bucket layout is the same or new
|
||||
// buckets were added. It does not handle the case of buckets missing.
|
||||
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool { |
||||
if len(oldSpans) == 0 || len(oldBuckets) == 0 { |
||||
return false |
||||
} |
||||
|
||||
oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices.
|
||||
oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span.
|
||||
oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset |
||||
|
||||
oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice.
|
||||
oldVal, newVal := oldBuckets[0], newBuckets[0] |
||||
|
||||
// Since we assume that new spans won't have missing buckets, there will never be a case
|
||||
// where the old index will not find a matching new index.
|
||||
for { |
||||
if oldIdx == newIdx { |
||||
if newVal < oldVal { |
||||
return true |
||||
} |
||||
} |
||||
|
||||
if oldIdx <= newIdx { |
||||
// Moving ahead old bucket and span by 1 index.
|
||||
if oldInsideSpanIdx == oldSpans[oldSpanSliceIdx].Length-1 { |
||||
// Current span is over.
|
||||
oldSpanSliceIdx++ |
||||
oldInsideSpanIdx = 0 |
||||
if oldSpanSliceIdx >= len(oldSpans) { |
||||
// All old spans are over.
|
||||
break |
||||
} |
||||
oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset |
||||
} else { |
||||
oldInsideSpanIdx++ |
||||
oldIdx++ |
||||
} |
||||
oldBucketSliceIdx++ |
||||
oldVal += oldBuckets[oldBucketSliceIdx] |
||||
} |
||||
|
||||
if oldIdx > newIdx { |
||||
// Moving ahead new bucket and span by 1 index.
|
||||
if newInsideSpanIdx == newSpans[newSpanSliceIdx].Length-1 { |
||||
// Current span is over.
|
||||
newSpanSliceIdx++ |
||||
newInsideSpanIdx = 0 |
||||
if newSpanSliceIdx >= len(newSpans) { |
||||
// All new spans are over.
|
||||
// This should not happen, old spans above should catch this first.
|
||||
panic("new spans over before old spans in counterReset") |
||||
} |
||||
newIdx += 1 + newSpans[newSpanSliceIdx].Offset |
||||
} else { |
||||
newInsideSpanIdx++ |
||||
newIdx++ |
||||
} |
||||
newBucketSliceIdx++ |
||||
newVal += newBuckets[newBucketSliceIdx] |
||||
} |
||||
} |
||||
|
||||
return false |
||||
} |
||||
|
||||
// AppendHistogram appends a histogram to the chunk. The caller must ensure that
|
||||
// the histogram is properly structured, e.g. the number of buckets used
|
||||
// corresponds to the number conveyed by the span structures. First call
|
||||
// Appendable() and act accordingly!
|
||||
func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { |
||||
var tDelta, cntDelta, zCntDelta int64 |
||||
num := binary.BigEndian.Uint16(a.b.bytes()) |
||||
|
||||
if value.IsStaleNaN(h.Sum) { |
||||
// Emptying out other fields to write no buckets, and an empty
|
||||
// meta in case of first histogram in the chunk.
|
||||
h = histogram.Histogram{Sum: h.Sum} |
||||
} |
||||
|
||||
switch num { |
||||
case 0: |
||||
// The first append gets the privilege to dictate the metadata
|
||||
// but it's also responsible for encoding it into the chunk!
|
||||
writeHistogramChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) |
||||
a.schema = h.Schema |
||||
a.zThreshold = h.ZeroThreshold |
||||
a.pSpans, a.nSpans = h.PositiveSpans, h.NegativeSpans |
||||
numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) |
||||
a.pBuckets = make([]int64, numPBuckets) |
||||
a.nBuckets = make([]int64, numNBuckets) |
||||
a.pBucketsDelta = make([]int64, numPBuckets) |
||||
a.nBucketsDelta = make([]int64, numNBuckets) |
||||
|
||||
// Now store the actual data.
|
||||
putVarint(a.b, a.buf64, t) |
||||
putUvarint(a.b, a.buf64, h.Count) // TODO(beorn7): Use putVarbitInt?
|
||||
putUvarint(a.b, a.buf64, h.ZeroCount) // TODO(beorn7): Use putVarbitInt?
|
||||
a.b.writeBits(math.Float64bits(h.Sum), 64) |
||||
for _, buck := range h.PositiveBuckets { |
||||
putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt?
|
||||
} |
||||
for _, buck := range h.NegativeBuckets { |
||||
putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt?
|
||||
} |
||||
case 1: |
||||
tDelta = t - a.t |
||||
cntDelta = int64(h.Count) - int64(a.cnt) |
||||
zCntDelta = int64(h.ZeroCount) - int64(a.zCnt) |
||||
|
||||
if value.IsStaleNaN(h.Sum) { |
||||
cntDelta, zCntDelta = 0, 0 |
||||
} |
||||
|
||||
putVarint(a.b, a.buf64, tDelta) // TODO(beorn7): This should probably be putUvarint.
|
||||
putVarint(a.b, a.buf64, cntDelta) // TODO(beorn7): Use putVarbitInt?
|
||||
putVarint(a.b, a.buf64, zCntDelta) // TODO(beorn7): Use putVarbitInt?
|
||||
|
||||
a.writeSumDelta(h.Sum) |
||||
|
||||
for i, buck := range h.PositiveBuckets { |
||||
delta := buck - a.pBuckets[i] |
||||
putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt?
|
||||
a.pBucketsDelta[i] = delta |
||||
} |
||||
for i, buck := range h.NegativeBuckets { |
||||
delta := buck - a.nBuckets[i] |
||||
putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt?
|
||||
a.nBucketsDelta[i] = delta |
||||
} |
||||
|
||||
default: |
||||
tDelta = t - a.t |
||||
cntDelta = int64(h.Count) - int64(a.cnt) |
||||
zCntDelta = int64(h.ZeroCount) - int64(a.zCnt) |
||||
|
||||
tDod := tDelta - a.tDelta |
||||
cntDod := cntDelta - a.cntDelta |
||||
zCntDod := zCntDelta - a.zCntDelta |
||||
|
||||
if value.IsStaleNaN(h.Sum) { |
||||
cntDod, zCntDod = 0, 0 |
||||
} |
||||
|
||||
putVarbitInt(a.b, tDod) |
||||
putVarbitInt(a.b, cntDod) |
||||
putVarbitInt(a.b, zCntDod) |
||||
|
||||
a.writeSumDelta(h.Sum) |
||||
|
||||
for i, buck := range h.PositiveBuckets { |
||||
delta := buck - a.pBuckets[i] |
||||
dod := delta - a.pBucketsDelta[i] |
||||
putVarbitInt(a.b, dod) |
||||
a.pBucketsDelta[i] = delta |
||||
} |
||||
for i, buck := range h.NegativeBuckets { |
||||
delta := buck - a.nBuckets[i] |
||||
dod := delta - a.nBucketsDelta[i] |
||||
putVarbitInt(a.b, dod) |
||||
a.nBucketsDelta[i] = delta |
||||
} |
||||
} |
||||
|
||||
binary.BigEndian.PutUint16(a.b.bytes(), num+1) |
||||
|
||||
a.t = t |
||||
a.cnt = h.Count |
||||
a.zCnt = h.ZeroCount |
||||
a.tDelta = tDelta |
||||
a.cntDelta = cntDelta |
||||
a.zCntDelta = zCntDelta |
||||
|
||||
a.pBuckets, a.nBuckets = h.PositiveBuckets, h.NegativeBuckets |
||||
// Note that the bucket deltas were already updated above.
|
||||
a.sum = h.Sum |
||||
} |
||||
|
||||
// Recode converts the current chunk to accommodate an expansion of the set of
|
||||
// (positive and/or negative) buckets used, according to the provided
|
||||
// interjections, resulting in the honoring of the provided new positive and
|
||||
// negative spans.
|
||||
func (a *HistogramAppender) Recode( |
||||
positiveInterjections, negativeInterjections []Interjection, |
||||
positiveSpans, negativeSpans []histogram.Span, |
||||
) (Chunk, Appender) { |
||||
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||
// it again with the new span layout. This can probably be done in-place
|
||||
// by editing the chunk. But let's first see how expensive it is in the
|
||||
// big picture.
|
||||
byts := a.b.bytes() |
||||
it := newHistogramIterator(byts) |
||||
hc := NewHistogramChunk() |
||||
app, err := hc.Appender() |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) |
||||
|
||||
for it.Next() { |
||||
tOld, hOld := it.AtHistogram() |
||||
|
||||
// We have to newly allocate slices for the modified buckets
|
||||
// here because they are kept by the appender until the next
|
||||
// append.
|
||||
// TODO(beorn7): We might be able to optimize this.
|
||||
positiveBuckets := make([]int64, numPositiveBuckets) |
||||
negativeBuckets := make([]int64, numNegativeBuckets) |
||||
|
||||
// Save the modified histogram to the new chunk.
|
||||
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans |
||||
if len(positiveInterjections) > 0 { |
||||
hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections) |
||||
} |
||||
if len(negativeInterjections) > 0 { |
||||
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections) |
||||
} |
||||
app.AppendHistogram(tOld, hOld) |
||||
} |
||||
|
||||
hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000)) |
||||
return hc, app |
||||
} |
||||
|
||||
func (a *HistogramAppender) writeSumDelta(v float64) { |
||||
vDelta := math.Float64bits(v) ^ math.Float64bits(a.sum) |
||||
|
||||
if vDelta == 0 { |
||||
a.b.writeBit(zero) |
||||
return |
||||
} |
||||
a.b.writeBit(one) |
||||
|
||||
leading := uint8(bits.LeadingZeros64(vDelta)) |
||||
trailing := uint8(bits.TrailingZeros64(vDelta)) |
||||
|
||||
// Clamp number of leading zeros to avoid overflow when encoding.
|
||||
if leading >= 32 { |
||||
leading = 31 |
||||
} |
||||
|
||||
if a.leading != 0xff && leading >= a.leading && trailing >= a.trailing { |
||||
a.b.writeBit(zero) |
||||
a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) |
||||
} else { |
||||
a.leading, a.trailing = leading, trailing |
||||
|
||||
a.b.writeBit(one) |
||||
a.b.writeBits(uint64(leading), 5) |
||||
|
||||
// Note that if leading == trailing == 0, then sigbits == 64.
|
||||
// But that value doesn't actually fit into the 6 bits we have.
|
||||
// Luckily, we never need to encode 0 significant bits, since
|
||||
// that would put us in the other case (vdelta == 0). So
|
||||
// instead we write out a 0 and adjust it back to 64 on
|
||||
// unpacking.
|
||||
sigbits := 64 - leading - trailing |
||||
a.b.writeBits(uint64(sigbits), 6) |
||||
a.b.writeBits(vDelta>>trailing, int(sigbits)) |
||||
} |
||||
} |
||||
|
||||
type histogramIterator struct { |
||||
br bstreamReader |
||||
numTotal uint16 |
||||
numRead uint16 |
||||
|
||||
// Metadata:
|
||||
schema int32 |
||||
zThreshold float64 |
||||
pSpans, nSpans []histogram.Span |
||||
|
||||
// For the fields that are tracked as deltas and ultimately dod's.
|
||||
t int64 |
||||
cnt, zCnt uint64 |
||||
tDelta, cntDelta, zCntDelta int64 |
||||
pBuckets, nBuckets []int64 |
||||
pBucketsDelta, nBucketsDelta []int64 |
||||
|
||||
// The sum is Gorilla xor encoded.
|
||||
sum float64 |
||||
leading uint8 |
||||
trailing uint8 |
||||
|
||||
err error |
||||
} |
||||
|
||||
func (it *histogramIterator) Seek(t int64) bool { |
||||
if it.err != nil { |
||||
return false |
||||
} |
||||
|
||||
for t > it.t || it.numRead == 0 { |
||||
if !it.Next() { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func (it *histogramIterator) At() (int64, float64) { |
||||
panic("cannot call histogramIterator.At") |
||||
} |
||||
|
||||
func (it *histogramIterator) ChunkEncoding() Encoding { |
||||
return EncHistogram |
||||
} |
||||
|
||||
func (it *histogramIterator) AtHistogram() (int64, histogram.Histogram) { |
||||
if value.IsStaleNaN(it.sum) { |
||||
return it.t, histogram.Histogram{Sum: it.sum} |
||||
} |
||||
return it.t, histogram.Histogram{ |
||||
Count: it.cnt, |
||||
ZeroCount: it.zCnt, |
||||
Sum: it.sum, |
||||
ZeroThreshold: it.zThreshold, |
||||
Schema: it.schema, |
||||
PositiveSpans: it.pSpans, |
||||
NegativeSpans: it.nSpans, |
||||
PositiveBuckets: it.pBuckets, |
||||
NegativeBuckets: it.nBuckets, |
||||
} |
||||
} |
||||
|
||||
func (it *histogramIterator) Err() error { |
||||
return it.err |
||||
} |
||||
|
||||
func (it *histogramIterator) Reset(b []byte) { |
||||
// The first 2 bytes contain chunk headers.
|
||||
// We skip that for actual samples.
|
||||
it.br = newBReader(b[2:]) |
||||
it.numTotal = binary.BigEndian.Uint16(b) |
||||
it.numRead = 0 |
||||
|
||||
it.t, it.cnt, it.zCnt = 0, 0, 0 |
||||
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0 |
||||
|
||||
// TODO(beorn7): Those will be recreated anyway.
|
||||
// Either delete them here entirely or recycle them
|
||||
// below if big enough.
|
||||
for i := range it.pBuckets { |
||||
it.pBuckets[i] = 0 |
||||
it.pBucketsDelta[i] = 0 |
||||
} |
||||
for i := range it.nBuckets { |
||||
it.nBuckets[i] = 0 |
||||
it.nBucketsDelta[i] = 0 |
||||
} |
||||
|
||||
it.sum = 0 |
||||
it.leading = 0 |
||||
it.trailing = 0 |
||||
it.err = nil |
||||
} |
||||
|
||||
func (it *histogramIterator) Next() bool { |
||||
if it.err != nil || it.numRead == it.numTotal { |
||||
return false |
||||
} |
||||
|
||||
if it.numRead == 0 { |
||||
|
||||
// The first read is responsible for reading the chunk metadata
|
||||
// and for initializing fields that depend on it. We give
|
||||
// counter reset info at chunk level, hence we discard it here.
|
||||
schema, zeroThreshold, posSpans, negSpans, err := readHistogramChunkMeta(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.schema = schema |
||||
it.zThreshold = zeroThreshold |
||||
it.pSpans, it.nSpans = posSpans, negSpans |
||||
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) |
||||
if numPosBuckets > 0 { |
||||
it.pBuckets = make([]int64, numPosBuckets) |
||||
it.pBucketsDelta = make([]int64, numPosBuckets) |
||||
} |
||||
if numNegBuckets > 0 { |
||||
it.nBuckets = make([]int64, numNegBuckets) |
||||
it.nBucketsDelta = make([]int64, numNegBuckets) |
||||
} |
||||
|
||||
// Now read the actual data.
|
||||
t, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.t = t |
||||
|
||||
cnt, err := binary.ReadUvarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.cnt = cnt |
||||
|
||||
zcnt, err := binary.ReadUvarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.zCnt = zcnt |
||||
|
||||
sum, err := it.br.readBits(64) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.sum = math.Float64frombits(sum) |
||||
|
||||
for i := range it.pBuckets { |
||||
v, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.pBuckets[i] = v |
||||
} |
||||
for i := range it.nBuckets { |
||||
v, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.nBuckets[i] = v |
||||
} |
||||
|
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
if it.numRead == 1 { |
||||
tDelta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.tDelta = tDelta |
||||
it.t += int64(it.tDelta) |
||||
|
||||
cntDelta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.cntDelta = cntDelta |
||||
it.cnt = uint64(int64(it.cnt) + it.cntDelta) |
||||
|
||||
zcntDelta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.zCntDelta = zcntDelta |
||||
it.zCnt = uint64(int64(it.zCnt) + it.zCntDelta) |
||||
|
||||
ok := it.readSum() |
||||
if !ok { |
||||
return false |
||||
} |
||||
|
||||
if value.IsStaleNaN(it.sum) { |
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
for i := range it.pBuckets { |
||||
delta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.pBucketsDelta[i] = delta |
||||
it.pBuckets[i] = it.pBuckets[i] + delta |
||||
} |
||||
|
||||
for i := range it.nBuckets { |
||||
delta, err := binary.ReadVarint(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.nBucketsDelta[i] = delta |
||||
it.nBuckets[i] = it.nBuckets[i] + delta |
||||
} |
||||
|
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
tDod, err := readVarbitInt(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.tDelta = it.tDelta + tDod |
||||
it.t += it.tDelta |
||||
|
||||
cntDod, err := readVarbitInt(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.cntDelta = it.cntDelta + cntDod |
||||
it.cnt = uint64(int64(it.cnt) + it.cntDelta) |
||||
|
||||
zcntDod, err := readVarbitInt(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.zCntDelta = it.zCntDelta + zcntDod |
||||
it.zCnt = uint64(int64(it.zCnt) + it.zCntDelta) |
||||
|
||||
ok := it.readSum() |
||||
if !ok { |
||||
return false |
||||
} |
||||
|
||||
if value.IsStaleNaN(it.sum) { |
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
for i := range it.pBuckets { |
||||
dod, err := readVarbitInt(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.pBucketsDelta[i] = it.pBucketsDelta[i] + dod |
||||
it.pBuckets[i] = it.pBuckets[i] + it.pBucketsDelta[i] |
||||
} |
||||
|
||||
for i := range it.nBuckets { |
||||
dod, err := readVarbitInt(&it.br) |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.nBucketsDelta[i] = it.nBucketsDelta[i] + dod |
||||
it.nBuckets[i] = it.nBuckets[i] + it.nBucketsDelta[i] |
||||
} |
||||
|
||||
it.numRead++ |
||||
return true |
||||
} |
||||
|
||||
func (it *histogramIterator) readSum() bool { |
||||
bit, err := it.br.readBitFast() |
||||
if err != nil { |
||||
bit, err = it.br.readBit() |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
|
||||
if bit == zero { |
||||
return true // it.sum = it.sum
|
||||
} |
||||
|
||||
bit, err = it.br.readBitFast() |
||||
if err != nil { |
||||
bit, err = it.br.readBit() |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
if bit == zero { |
||||
// Reuse leading/trailing zero bits.
|
||||
// it.leading, it.trailing = it.leading, it.trailing
|
||||
} else { |
||||
bits, err := it.br.readBitsFast(5) |
||||
if err != nil { |
||||
bits, err = it.br.readBits(5) |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
it.leading = uint8(bits) |
||||
|
||||
bits, err = it.br.readBitsFast(6) |
||||
if err != nil { |
||||
bits, err = it.br.readBits(6) |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
mbits := uint8(bits) |
||||
// 0 significant bits here means we overflowed and we actually
|
||||
// need 64; see comment in encoder.
|
||||
if mbits == 0 { |
||||
mbits = 64 |
||||
} |
||||
it.trailing = 64 - it.leading - mbits |
||||
} |
||||
|
||||
mbits := 64 - it.leading - it.trailing |
||||
bits, err := it.br.readBitsFast(mbits) |
||||
if err != nil { |
||||
bits, err = it.br.readBits(mbits) |
||||
} |
||||
if err != nil { |
||||
it.err = err |
||||
return false |
||||
} |
||||
vbits := math.Float64bits(it.sum) |
||||
vbits ^= bits << it.trailing |
||||
it.sum = math.Float64frombits(vbits) |
||||
return true |
||||
} |
@ -0,0 +1,286 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"github.com/prometheus/prometheus/model/histogram" |
||||
) |
||||
|
||||
func writeHistogramChunkMeta(b *bstream, schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span) { |
||||
putVarbitInt(b, int64(schema)) |
||||
putVarbitFloat(b, zeroThreshold) |
||||
putHistogramChunkMetaSpans(b, positiveSpans) |
||||
putHistogramChunkMetaSpans(b, negativeSpans) |
||||
} |
||||
|
||||
func putHistogramChunkMetaSpans(b *bstream, spans []histogram.Span) { |
||||
putVarbitInt(b, int64(len(spans))) |
||||
for _, s := range spans { |
||||
putVarbitInt(b, int64(s.Length)) |
||||
putVarbitInt(b, int64(s.Offset)) |
||||
} |
||||
} |
||||
|
||||
func readHistogramChunkMeta(b *bstreamReader) ( |
||||
schema int32, zeroThreshold float64, |
||||
positiveSpans, negativeSpans []histogram.Span, |
||||
err error, |
||||
) { |
||||
_, err = b.ReadByte() // The header.
|
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
v, err := readVarbitInt(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
schema = int32(v) |
||||
|
||||
zeroThreshold, err = readVarbitFloat(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
positiveSpans, err = readHistogramChunkMetaSpans(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
negativeSpans, err = readHistogramChunkMetaSpans(b) |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
return |
||||
} |
||||
|
||||
func readHistogramChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { |
||||
var spans []histogram.Span |
||||
num, err := readVarbitInt(b) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
for i := 0; i < int(num); i++ { |
||||
|
||||
length, err := readVarbitInt(b) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
offset, err := readVarbitInt(b) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
spans = append(spans, histogram.Span{ |
||||
Length: uint32(length), |
||||
Offset: int32(offset), |
||||
}) |
||||
} |
||||
return spans, nil |
||||
} |
||||
|
||||
type bucketIterator struct { |
||||
spans []histogram.Span |
||||
span int // Span position of last yielded bucket.
|
||||
bucket int // Bucket position within span of last yielded bucket.
|
||||
idx int // Bucket index (globally across all spans) of last yielded bucket.
|
||||
} |
||||
|
||||
func newBucketIterator(spans []histogram.Span) *bucketIterator { |
||||
b := bucketIterator{ |
||||
spans: spans, |
||||
span: 0, |
||||
bucket: -1, |
||||
idx: -1, |
||||
} |
||||
if len(spans) > 0 { |
||||
b.idx += int(spans[0].Offset) |
||||
} |
||||
return &b |
||||
} |
||||
|
||||
func (b *bucketIterator) Next() (int, bool) { |
||||
// We're already out of bounds.
|
||||
if b.span >= len(b.spans) { |
||||
return 0, false |
||||
} |
||||
try: |
||||
if b.bucket < int(b.spans[b.span].Length-1) { // Try to move within same span.
|
||||
b.bucket++ |
||||
b.idx++ |
||||
return b.idx, true |
||||
} else if b.span < len(b.spans)-1 { // Try to move from one span to the next.
|
||||
b.span++ |
||||
b.idx += int(b.spans[b.span].Offset + 1) |
||||
b.bucket = 0 |
||||
if b.spans[b.span].Length == 0 { |
||||
// Pathological case that should never happen. We can't use this span, let's try again.
|
||||
goto try |
||||
} |
||||
return b.idx, true |
||||
} |
||||
// We're out of options.
|
||||
return 0, false |
||||
} |
||||
|
||||
// An Interjection describes how many new buckets have to be introduced before
|
||||
// processing the pos'th delta from the original slice.
|
||||
type Interjection struct { |
||||
pos int |
||||
num int |
||||
} |
||||
|
||||
// compareSpans returns the interjections to convert a slice of deltas to a new
|
||||
// slice representing an expanded set of buckets, or false if incompatible
|
||||
// (e.g. if buckets were removed).
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// Let's say the old buckets look like this:
|
||||
//
|
||||
// span syntax: [offset, length]
|
||||
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
|
||||
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
|
||||
// raw values 6 3 3 2 4 5 1
|
||||
// deltas 6 -3 0 -1 2 1 -4
|
||||
//
|
||||
// But now we introduce a new bucket layout. (Carefully chosen example where we
|
||||
// have a span appended, one unchanged[*], one prepended, and two merge - in
|
||||
// that order.)
|
||||
//
|
||||
// [*] unchanged in terms of which bucket indices they represent. but to achieve
|
||||
// that, their offset needs to change if "disrupted" by spans changing ahead of
|
||||
// them
|
||||
//
|
||||
// \/ this one is "unchanged"
|
||||
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
|
||||
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
|
||||
// raw values 6 3 0 3 0 0 2 4 5 0 1
|
||||
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
|
||||
// delta mods: / \ / \ / \
|
||||
//
|
||||
// Note that whenever any new buckets are introduced, the subsequent "old"
|
||||
// bucket needs to readjust its delta to the new base of 0. Thus, for the caller
|
||||
// who wants to transform the set of original deltas to a new set of deltas to
|
||||
// match a new span layout that adds buckets, we simply need to generate a list
|
||||
// of interjections.
|
||||
//
|
||||
// Note: Within compareSpans we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
func compareSpans(a, b []histogram.Span) ([]Interjection, bool) { |
||||
ai := newBucketIterator(a) |
||||
bi := newBucketIterator(b) |
||||
|
||||
var interjections []Interjection |
||||
|
||||
// When inter.num becomes > 0, this becomes a valid interjection that
|
||||
// should be yielded when we finish a streak of new buckets.
|
||||
var inter Interjection |
||||
|
||||
av, aOK := ai.Next() |
||||
bv, bOK := bi.Next() |
||||
loop: |
||||
for { |
||||
switch { |
||||
case aOK && bOK: |
||||
switch { |
||||
case av == bv: // Both have an identical value. move on!
|
||||
// Finish WIP interjection and reset.
|
||||
if inter.num > 0 { |
||||
interjections = append(interjections, inter) |
||||
} |
||||
inter.num = 0 |
||||
av, aOK = ai.Next() |
||||
bv, bOK = bi.Next() |
||||
inter.pos++ |
||||
case av < bv: // b misses a value that is in a.
|
||||
return interjections, false |
||||
case av > bv: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++ |
||||
bv, bOK = bi.Next() |
||||
} |
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
return interjections, false |
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++ |
||||
bv, bOK = bi.Next() |
||||
default: // Both iterators ran out. We're done.
|
||||
if inter.num > 0 { |
||||
interjections = append(interjections, inter) |
||||
} |
||||
break loop |
||||
} |
||||
} |
||||
|
||||
return interjections, true |
||||
} |
||||
|
||||
// interject merges 'in' with the provided interjections and writes them into
|
||||
// 'out', which must already have the appropriate length.
|
||||
func interject(in, out []int64, interjections []Interjection) []int64 { |
||||
var ( |
||||
j int // Position in out.
|
||||
v int64 // The last value seen.
|
||||
interj int // The next interjection to process.
|
||||
) |
||||
for i, d := range in { |
||||
if interj < len(interjections) && i == interjections[interj].pos { |
||||
|
||||
// We have an interjection!
|
||||
// Add interjection.num new delta values such that their
|
||||
// bucket values equate 0.
|
||||
out[j] = int64(-v) |
||||
j++ |
||||
for x := 1; x < interjections[interj].num; x++ { |
||||
out[j] = 0 |
||||
j++ |
||||
} |
||||
interj++ |
||||
|
||||
// Now save the value from the input. The delta value we
|
||||
// should save is the original delta value + the last
|
||||
// value of the point before the interjection (to undo
|
||||
// the delta that was introduced by the interjection).
|
||||
out[j] = d + v |
||||
j++ |
||||
v = d + v |
||||
continue |
||||
} |
||||
|
||||
// If there was no interjection, the original delta is still
|
||||
// valid.
|
||||
out[j] = d |
||||
j++ |
||||
v += d |
||||
} |
||||
switch interj { |
||||
case len(interjections): |
||||
// All interjections processed. Nothing more to do.
|
||||
case len(interjections) - 1: |
||||
// One more interjection to process at the end.
|
||||
out[j] = int64(-v) |
||||
j++ |
||||
for x := 1; x < interjections[interj].num; x++ { |
||||
out[j] = 0 |
||||
j++ |
||||
} |
||||
default: |
||||
panic("unprocessed interjections left") |
||||
} |
||||
return out |
||||
} |
@ -0,0 +1,143 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"math" |
||||
) |
||||
|
||||
// putVarbitFloat writes a float64 using varbit encoding. It does so by
|
||||
// converting the underlying bits into an int64.
|
||||
func putVarbitFloat(b *bstream, val float64) { |
||||
// TODO(beorn7): The resulting int64 here will almost never be a small
|
||||
// integer. Thus, the varbit encoding doesn't really make sense
|
||||
// here. This function is only used to encode the zero threshold in
|
||||
// histograms. Based on that, here is an idea to improve the encoding:
|
||||
//
|
||||
// It is recommended to use (usually negative) powers of two as
|
||||
// threshoulds. The default value for the zero threshald is in fact
|
||||
// 2^-128, or 0.5*2^-127, as it is represented by IEEE 754. It is
|
||||
// therefore worth a try to test if the threshold is a power of 2 and
|
||||
// then just store the exponent. 0 is also a commen threshold for those
|
||||
// use cases where only observations of precisely zero should go to the
|
||||
// zero bucket. This results in the following proposal:
|
||||
// - First we store 1 byte.
|
||||
// - Iff that byte is 255 (all bits set), it is followed by a direct
|
||||
// 8byte representation of the float.
|
||||
// - If the byte is 0, the threshold is 0.
|
||||
// - In all other cases, take the number represented by the byte,
|
||||
// subtract 246, and that's the exponent (i.e. between -245 and
|
||||
// +8, covering thresholds that are powers of 2 between 2^-246
|
||||
// to 128).
|
||||
putVarbitInt(b, int64(math.Float64bits(val))) |
||||
} |
||||
|
||||
// readVarbitFloat reads a float64 encoded with putVarbitFloat
|
||||
func readVarbitFloat(b *bstreamReader) (float64, error) { |
||||
val, err := readVarbitInt(b) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return math.Float64frombits(uint64(val)), nil |
||||
} |
||||
|
||||
// putVarbitInt writes an int64 using varbit encoding with a bit bucketing
|
||||
// optimized for the dod's observed in histogram buckets.
|
||||
//
|
||||
// TODO(Dieterbe): We could improve this further: Each branch doesn't need to
|
||||
// support any values of any of the prior branches. So we can expand the range
|
||||
// of each branch. Do more with fewer bits. It comes at the price of more
|
||||
// expensive encoding and decoding (cutting out and later adding back that
|
||||
// center-piece we skip).
|
||||
func putVarbitInt(b *bstream, val int64) { |
||||
switch { |
||||
case val == 0: |
||||
b.writeBit(zero) |
||||
case bitRange(val, 3): // -3 <= val <= 4
|
||||
b.writeBits(0b10, 2) |
||||
b.writeBits(uint64(val), 3) |
||||
case bitRange(val, 6): // -31 <= val <= 32
|
||||
b.writeBits(0b110, 3) |
||||
b.writeBits(uint64(val), 6) |
||||
case bitRange(val, 9): // -255 <= val <= 256
|
||||
b.writeBits(0b1110, 4) |
||||
b.writeBits(uint64(val), 9) |
||||
case bitRange(val, 12): // -2047 <= val <= 2048
|
||||
b.writeBits(0b11110, 5) |
||||
b.writeBits(uint64(val), 12) |
||||
default: |
||||
b.writeBits(0b11111, 5) |
||||
b.writeBits(uint64(val), 64) |
||||
} |
||||
} |
||||
|
||||
// readVarbitInt reads an int64 encoced with putVarbitInt.
|
||||
func readVarbitInt(b *bstreamReader) (int64, error) { |
||||
var d byte |
||||
for i := 0; i < 5; i++ { |
||||
d <<= 1 |
||||
bit, err := b.readBitFast() |
||||
if err != nil { |
||||
bit, err = b.readBit() |
||||
} |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
if bit == zero { |
||||
break |
||||
} |
||||
d |= 1 |
||||
} |
||||
|
||||
var val int64 |
||||
var sz uint8 |
||||
|
||||
switch d { |
||||
case 0b0: |
||||
// val == 0
|
||||
case 0b10: |
||||
sz = 3 |
||||
case 0b110: |
||||
sz = 6 |
||||
case 0b1110: |
||||
sz = 9 |
||||
case 0b11110: |
||||
sz = 12 |
||||
case 0b11111: |
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := b.readBits(64) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
val = int64(bits) |
||||
} |
||||
|
||||
if sz != 0 { |
||||
bits, err := b.readBitsFast(sz) |
||||
if err != nil { |
||||
bits, err = b.readBits(sz) |
||||
} |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
if bits > (1 << (sz - 1)) { |
||||
// Or something.
|
||||
bits = bits - (1 << sz) |
||||
} |
||||
val = int64(bits) |
||||
} |
||||
|
||||
return val, nil |
||||
} |
@ -1,155 +0,0 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// The code in this file was largely written by Damian Gryski as part of
|
||||
// https://github.com/dgryski/go-tsz and published under the license below.
|
||||
// It was modified to accommodate reading from byte slices without modifying
|
||||
// the underlying bytes, which would panic when reading from mmap'd
|
||||
// read-only byte slices.
|
||||
|
||||
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
|
||||
// All rights reserved.
|
||||
|
||||
// Redistribution and use in source and binary forms, with or without
|
||||
// modification, are permitted provided that the following conditions are met:
|
||||
|
||||
// * Redistributions of source code must retain the above copyright notice,
|
||||
// this list of conditions and the following disclaimer.
|
||||
//
|
||||
// * Redistributions in binary form must reproduce the above copyright notice,
|
||||
// this list of conditions and the following disclaimer in the documentation
|
||||
// and/or other materials provided with the distribution.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"math" |
||||
) |
||||
|
||||
// putFloat64VBBucket writes a float64 using varbit optimized for SHS buckets.
|
||||
// It does so by converting the underlying bits into an int64.
|
||||
func putFloat64VBBucket(b *bstream, val float64) { |
||||
// TODO: Since this is used for the zero threshold, this almost always goes into the default
|
||||
// bit range (i.e. using 5+64 bits). So we can consider skipping `putInt64VBBucket` and directly
|
||||
// write the float and save 5 bits here.
|
||||
putInt64VBBucket(b, int64(math.Float64bits(val))) |
||||
} |
||||
|
||||
// readFloat64VBBucket reads a float64 using varbit optimized for SHS buckets
|
||||
func readFloat64VBBucket(b *bstreamReader) (float64, error) { |
||||
val, err := readInt64VBBucket(b) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return math.Float64frombits(uint64(val)), nil |
||||
} |
||||
|
||||
// putInt64VBBucket writes an int64 using varbit optimized for SHS buckets.
|
||||
//
|
||||
// TODO(Dieterbe): We could improve this further: Each branch doesn't need to
|
||||
// support any values of any of the prior branches. So we can expand the range
|
||||
// of each branch. Do more with fewer bits. It comes at the price of more
|
||||
// expensive encoding and decoding (cutting out and later adding back that
|
||||
// center-piece we skip).
|
||||
func putInt64VBBucket(b *bstream, val int64) { |
||||
switch { |
||||
case val == 0: |
||||
b.writeBit(zero) |
||||
case bitRange(val, 3): // -3 <= val <= 4
|
||||
b.writeBits(0b10, 2) |
||||
b.writeBits(uint64(val), 3) |
||||
case bitRange(val, 6): // -31 <= val <= 32
|
||||
b.writeBits(0b110, 3) |
||||
b.writeBits(uint64(val), 6) |
||||
case bitRange(val, 9): // -255 <= val <= 256
|
||||
b.writeBits(0b1110, 4) |
||||
b.writeBits(uint64(val), 9) |
||||
case bitRange(val, 12): // -2047 <= val <= 2048
|
||||
b.writeBits(0b11110, 5) |
||||
b.writeBits(uint64(val), 12) |
||||
default: |
||||
b.writeBits(0b11111, 5) |
||||
b.writeBits(uint64(val), 64) |
||||
} |
||||
} |
||||
|
||||
// readInt64VBBucket reads an int64 using varbit optimized for SHS buckets
|
||||
func readInt64VBBucket(b *bstreamReader) (int64, error) { |
||||
var d byte |
||||
for i := 0; i < 5; i++ { |
||||
d <<= 1 |
||||
bit, err := b.readBitFast() |
||||
if err != nil { |
||||
bit, err = b.readBit() |
||||
} |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
if bit == zero { |
||||
break |
||||
} |
||||
d |= 1 |
||||
} |
||||
|
||||
var val int64 |
||||
var sz uint8 |
||||
|
||||
switch d { |
||||
case 0b0: |
||||
// val == 0
|
||||
case 0b10: |
||||
sz = 3 |
||||
case 0b110: |
||||
sz = 6 |
||||
case 0b1110: |
||||
sz = 9 |
||||
case 0b11110: |
||||
sz = 12 |
||||
case 0b11111: |
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := b.readBits(64) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
val = int64(bits) |
||||
} |
||||
|
||||
if sz != 0 { |
||||
bits, err := b.readBitsFast(sz) |
||||
if err != nil { |
||||
bits, err = b.readBits(sz) |
||||
} |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
if bits > (1 << (sz - 1)) { |
||||
// or something
|
||||
bits = bits - (1 << sz) |
||||
} |
||||
val = int64(bits) |
||||
} |
||||
|
||||
return val, nil |
||||
} |
Loading…
Reference in new issue