Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/internal/dataset/value_encoding_bitmap.go

694 lines
19 KiB

package dataset
import (
"errors"
"fmt"
"io"
"math/bits"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)
func init() {
// Register the encoding so instances of it can be dynamically created.
registerValueEncoding(
datasetmd.PHYSICAL_TYPE_UINT64,
datasetmd.ENCODING_TYPE_BITMAP,
func(w streamio.Writer) valueEncoder { return newBitmapEncoder(w) },
func(r streamio.Reader) valueDecoder { return newBitmapDecoder(r) },
)
}
const maxRunLength uint64 = 1<<63 - 1 // 2^63-1
// bitmapEncoder encodes and decodes bitmaps of unsigned numbers up to 64 bits
// wide. To use bitmap with signed integers, callers should first encode the
// integers using zig-zag encoding to minimize the number of bits needed for
// negative values.
//
// Data is encoded with a hybrid of run-length encoding and bitpacking. Longer
// sequences of the same value are encoded with run-length encoding, while
// shorter sequences are bitpacked when possible. To avoid padding, bitpacking
// is only used when there are a multiple of 8 values to encode.
//
// Bitpacking is done using a dynamic bit width. The bit width is determined by
// the largest value in the set of 8 values. The bit width can be any value
// from 1 to 64, inclusive.
//
// # Format
//
// The bitmap format is a slight modification of the Parquet format to support
// longer runs and support streaming. The EBNF grammar is as follows:
//
// bitmap = run+;
// run = bit_packed_run | rle_run;
// bit_packed_run = bit_packed_header bit_packed_values;
// bit_packed_header = (* uvarint(bit_packed_sets << 7 | bit_width << 1 | 1) *)
// bit_packed_sets = (* value between 1 and 2^57-1, inclusive; each set has 8 elements *)
// bit_width = (* bit size of element in set; value between 1 and 64, inclusive *)
// bit_packed_values = (* least significant bit of each byte to most significant bit of each byte *)
// rle_run = rle_header repeated_value;
// rle_header = (* uvarint(rle_run_len << 1) *)
// rle_run_len = (* value between 1 and 2^63-1, inclusive *)
// repeated_value = (* repeated value encoded as uvarint *)
//
// Where this differs from Parquet:
//
// - We don't use a fixed width for bitpacked values, to allow for streaming
// calls to Encode without knowing the width in advance. Instead, the width
// is determined when flushing a bitpacked set to an internal buffer.
//
// To minimize the overhead of encoding the width dynamically, we store
// each bitpacked set with the smallest amount of bits possible. If two
// sets have different widths, they are flushed as two different runs.
//
// - For simplicity, repeated_value is encoded as uvarint rather than
// flushing the value in its entirety.
//
// - To facilitate streaming, we don't prepend the length of all bytes
// written. Callers may choose to prepend the length. Without the length,
// readers must take caution to not read past the end of the RLE sequence
// by knowing exactly how many values were encoded.
type bitmapEncoder struct {
w streamio.Writer
// bitmapEncoder is a basic state machine with three states:
//
// READY The default state; no values are being tracked. Appending a new
// value moves to the RLE state.
//
// RLE bitmapEncoder is tracking a run of runValue. Active when
// runLength>0. If a value not matching runValue is appended and
// runLength<8, the state moves to BITPACK. Otherwise, the previous
// run is flushed and a new RLE run is started.
//
// BITPACK bitmapEncoder is tracking a set of values to bitpack. Active when
// setSize>0. Once the set reaches 8 values, the set is flushed and
// the encoder resets to READY.
runValue uint64 // Value in the current run.
runLength uint64 // Length of the current run.
set [8]uint64 // Set of values to bitpack.
setSize byte // Current number of elements in set.
buf *bitpackBuffer // Buffer for multiple runs of bitpacked sets of the same bit width.
}
// newBitmapEncoder creates a new bitmap encoder that writes encoded numbers to w.
func newBitmapEncoder(w streamio.Writer) *bitmapEncoder {
return &bitmapEncoder{
w: w,
buf: newBitpackBuffer(),
}
}
// PhysicalType returns [datasetmd.PHYSICAL_TYPE_UINT64].
func (enc *bitmapEncoder) PhysicalType() datasetmd.PhysicalType {
return datasetmd.PHYSICAL_TYPE_UINT64
}
// EncodingType returns [datasetmd.ENCODING_TYPE_BITMAP].
func (enc *bitmapEncoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_BITMAP
}
// Encode appends a new uint64 value to enc.
//
// When flushing, Encode returns an error if writing to the underlying
// [streamio.Writer] fails.
//
// Call [bitmapEncoder.Flush] to end the current run and flush any remaining
// values.
func (enc *bitmapEncoder) Encode(v Value) error {
if v.Type() != datasetmd.PHYSICAL_TYPE_UINT64 {
return fmt.Errorf("invalid value type %s", v.Type())
}
uv := v.Uint64()
switch {
case enc.runLength == 0 && enc.setSize == 0: // READY; start a new run.
enc.runValue = uv
enc.runLength = 1
return nil
case enc.runLength > 0 && uv == enc.runValue: // RLE with matching value; continue the run.
enc.runLength++
// If we hit the maximum run length, flush immediately.
if enc.runLength == maxRunLength {
return enc.flushRLE()
}
return nil
case enc.runLength > 0 && uv != enc.runValue: // RLE with different value; the run ended.
// If the run lasted less than 8 values, we switch to the BITPACK state.
if enc.runLength < 8 {
enc.switchToBitpack()
enc.set[enc.setSize] = uv
enc.setSize++
if enc.setSize == 8 {
return enc.flushBitpacked()
}
return nil
}
// Otherwise, flush the previous run and start a new one.
if err := enc.flushRLE(); err != nil {
return err
}
enc.runValue = uv
enc.runLength = 1
return nil
case enc.setSize > 0: // BITPACK; add the value to the set.
enc.set[enc.setSize] = uv
enc.setSize++
if enc.setSize == 8 {
return enc.flushBitpacked()
}
return nil
default:
panic("dataset.bitmapEncoder: invalid state")
}
}
func (enc *bitmapEncoder) EncodeN(v Value, n uint64) error {
if v.Type() != datasetmd.PHYSICAL_TYPE_UINT64 {
return fmt.Errorf("invalid value type %s", v.Type())
}
uv := v.Uint64()
switch {
case enc.runLength == 0 && enc.setSize == 0: // Ready; start a new run.
enc.runValue = uv
fallthrough
case enc.runLength > 0 && uv == enc.runValue: // RLE with matching value; continue the run.
if enc.runLength+n >= maxRunLength {
n -= (maxRunLength - enc.runLength) // remaining count
enc.runLength = maxRunLength
if err := enc.flushRLE(); err != nil {
return err
}
if n > 0 {
return enc.EncodeN(v, n)
}
} else {
enc.runLength += n
}
return nil
case enc.runLength > 0 && uv != enc.runValue: // RLE with different value; the run ended.
if enc.runLength >= 8 {
if err := enc.flushRLE(); err != nil {
return err
}
return enc.EncodeN(v, n)
}
enc.switchToBitpack()
fallthrough
case enc.setSize > 0:
// Fill up remaining slots with the new value
for n > 0 && enc.setSize < 8 {
enc.set[enc.setSize] = uv
enc.setSize++
n--
}
if enc.setSize == 8 {
if err := enc.flushBitpacked(); err != nil {
return err
}
}
if n > 0 {
return enc.EncodeN(v, n)
}
return nil
default:
panic("dataset.bitmapEncoder: invalid state")
}
}
func (enc *bitmapEncoder) switchToBitpack() {
// Copy over the existing run to the set and then add the new value.
enc.setSize = byte(enc.runLength)
for i := byte(0); i < enc.setSize; i++ {
enc.set[i] = enc.runValue
}
enc.runLength = 0
}
// Flush writes any remaining values to the underlying [streamio.Writer].
func (enc *bitmapEncoder) Flush() error {
// We always flush using RLE. If bitmapEncoder was in the BITPACK state, we
// don't have 8 values yet; if we did, they would've already been flushed in
// Encode.
return enc.flushRLE()
}
func (enc *bitmapEncoder) flushRLE() error {
// Flush anything in the bitpack buffer.
if err := enc.buf.Flush(enc.w); err != nil {
return err
}
switch {
case enc.runLength > 0:
if enc.runLength > maxRunLength {
return fmt.Errorf("run length too large")
}
// Header.
if err := streamio.WriteUvarint(enc.w, enc.runLength<<1); err != nil {
return err
}
// Value.
if err := streamio.WriteUvarint(enc.w, enc.runValue); err != nil {
return err
}
enc.runLength = 0
enc.runValue = 0
return nil
case enc.setSize > 0:
// Cosnume the set as a sequence of runs.
for off := byte(0); off < enc.setSize; {
var (
val = enc.set[off]
run = byte(1)
)
for j := off + 1; j < enc.setSize; j++ {
if enc.set[j] != val {
break
}
run++
}
off += run
// Header.
if err := streamio.WriteUvarint(enc.w, uint64(run<<1)); err != nil {
return err
}
// Value.
if err := streamio.WriteUvarint(enc.w, val); err != nil {
return err
}
}
enc.setSize = 0
return nil
default:
return nil
}
}
// flushBitpacked flushes the current bitpacked buffer for accumulating runs.
// If the buffer is full, we flush the buffer immediately to the underlying
// writer.
func (enc *bitmapEncoder) flushBitpacked() error {
if enc.setSize != 8 {
panic("dataset.bitmapEncoder: flushBitpacked called with less than 8 values")
}
// Detect the bit width of the set.
width := 1
for i := 0; i < int(enc.setSize); i++ {
if bitLength := bits.Len64(enc.set[i]); bitLength > width {
width = bitLength
}
}
// Write out the bitpacked values. Bitpacking 8 values of bit width N always
// requires exactly N bytes.
//
// Each value is packed from the least significant bit of each byte to the
// most significant bit, while still retaining the order of bits from the
// original value.
//
// This bitpacking algorithm is challenging to reason about, but is retained
// to align with Parquet's behaviour. I (rfratto) found it easier to reason
// by considering how the output bits map to the input bits.
//
// This means that for width == 3:
//
// index: 0 1 2 3 4 5 6 7
// dec value: 0 1 2 3 4 5 6 7
// bit value: 000 001 010 011 100 101 110 111
// bit label: ABC DEF GHI JKL MNO PQR STU VWX
//
// index: 22111000 54443332 77766655
// bit value: 10001000 11000110 11111010
// bit label: HIDEFABC RMNOJKLG VWXSTUPQ
//
// Formatting it as a table better demonstrates the mapping:
//
// Index Labels Input bit Output byte Output bit (for byte)
// ----- ----- --------- ----------- ---------------------
// 0 ABC 2 1 0 0 0 0 2 1 0
// 1 DEF 5 4 3 0 0 0 5 4 3
// 2 GHI 8 7 6 1 0 0 0 7 6
// 3 JKL 11 10 9 1 1 1 3 2 1
// 4 MNO 14 13 12 1 1 1 6 5 4
// 5 PQR 17 16 15 2 2 1 1 0 7
// 6 STU 20 19 18 2 2 2 4 3 2
// 7 VWX 23 22 21 2 2 2 7 6 5
//
// So, for any given output bit, its value originates from:
//
// * enc.set index: output_bit/width
// * enc.set element bit: output_bit%width
//
// If there's a much simpler way to understand and do this packing, I'd love
// to know.
buf := make([]byte, 0, width)
for outputByte := 0; outputByte < width; outputByte++ {
var b byte
for i := 0; i < 8; i++ {
outputBit := outputByte*8 + i
inputIndex := outputBit / width
inputBit := outputBit % width
// Set the bit in b.
if enc.set[inputIndex]&(1<<inputBit) != 0 {
b |= 1 << i
}
}
buf = append(buf, b)
}
// Append the set to our buffer. It only returns an error in two cases:
//
// 1. The width changed, or
// 2. the buffer is full.
//
// In either case, we want to flush and try again.
for range 2 {
if err := enc.buf.AppendSet(width, buf); err == nil {
break
}
if err := enc.buf.Flush(enc.w); err != nil {
return err
}
}
enc.setSize = 0
return nil
}
// Reset resets enc to write to w.
func (enc *bitmapEncoder) Reset(w streamio.Writer) {
enc.w = w
enc.runValue = 0
enc.runLength = 0
enc.setSize = 0
enc.buf.Reset()
}
type bitpackBuffer struct {
maxBufferSize int
width int
sets uint64 // Number of encoded sets. Each set has 8 elements.
data []byte // Total amount of data.
}
func newBitpackBuffer() *bitpackBuffer {
return &bitpackBuffer{
maxBufferSize: 4096, // 4KiB
}
}
// AppendSet appends a bitpacked set of 8 elements to the buffer. AppendSet
// fails if the buffer is too large or if the width changed.
func (b *bitpackBuffer) AppendSet(width int, data []byte) error {
if width < 1 || width > 64 {
return fmt.Errorf("invalid width: %d", width)
}
// Error conditions
switch {
case len(b.data)+len(data) > b.maxBufferSize && b.sets > 0:
return fmt.Errorf("buffer full")
case b.width != width && b.sets > 0:
return fmt.Errorf("width changed")
}
b.sets++
b.width = width
b.data = append(b.data, data...)
return nil
}
// Flush flushes buffered data to w and resets state for more writes.
func (b *bitpackBuffer) Flush(w streamio.Writer) error {
if b.sets == 0 {
return nil
}
// The header of the bitpacked sequence encodes:
//
// * The number of sets
// * The width of elements in the sets (1-64)
// * A flag indicating the header type
//
// To encode the width in 6 bits, we encode width-1. That reserves the bottom
// 7 bits for metadata, and the remaining 57 bits for the number of sets.
const maxSets uint64 = 1<<57 - 1
// Validate constraints for safety.
switch {
case b.width < 1 || b.width > 64:
return fmt.Errorf("invalid width: %d", b.width)
case b.sets > maxSets:
// This shouldn't ever happen; 2^57-1 sets, in the best case (width of 1),
// would require our buffer to be 144PB.
return fmt.Errorf("too many sets: %d", b.sets)
}
// Width can be between 1 and 64. To pack it into 6 bits, we subtract 1 from
// the value.
header := (b.sets << 7) | (uint64(b.width-1) << 1) | 1
if err := streamio.WriteUvarint(w, header); err != nil {
return err
}
if n, err := w.Write(b.data); err != nil {
return err
} else if n != len(b.data) {
return fmt.Errorf("short write: %d != %d", n, len(b.data))
}
b.width = 0
b.sets = 0
b.data = b.data[:0]
return nil
}
// Reset resets the buffer to its initial state without flushing.
func (b *bitpackBuffer) Reset() {
b.width = 0
b.sets = 0
b.data = b.data[:0]
}
// bitmapDecoder decoes uint64s from a bitmap-encoded stream. See the doc
// comment on [bitmapEncoder] for detail on the format.
type bitmapDecoder struct {
r streamio.Reader
// Like [bitmapEncoder], bitmapDecoder is a basic state machine with four
// states:
//
// READY The default state; the decoder needs to pull a new run
// header and move to RLE or BITPACK-READY.
//
// RLE The decoder is in the middle of an RLE-encoded run. Active
// when runLength>0. runLength should decrease by 1 each time
// Decode is called, returning runValue.
//
// BITPACK-READY The decoder is ready to read a new bitpacked set. Active
// when sets>0 and setSize==0. The decoder needs to pull the
// next set, update setSize and decrement sets by 1.
//
// BITPACK-SET The decoder is in the middle of a bitpacked set. Active
// when setSize>0. setSize decreases by 1 each time Decode is
// called, and the next bitpacked value in the set is
// returned.
//
// bitmapDecoder always starts in the READY state, and the header it pulls
// determines whether it moves to RLE or BITPACK-READY. After fully consuming
// a run, it reverts back to READY.
runValue uint64 // Value of the current RLE run.
runLength uint64 // Remaining values in the current RLE run.
sets int // Number of bitpacked sets left to read, each of which contains 8 elements.
setWidth int // Number of bits to use for each value. Must be no greater than 64.
setSize byte // Number of values left in the current bitpacked set.
set []byte // Current set of bitpacked values.
}
// newBitmapDecoder creates a new bitmap decoder that reads encoded numbers
// from r.
func newBitmapDecoder(r streamio.Reader) *bitmapDecoder {
return &bitmapDecoder{r: r}
}
// PhysicalType returns [datasetmd.PHYSICAL_TYPE_UINT64].
func (dec *bitmapDecoder) PhysicalType() datasetmd.PhysicalType {
return datasetmd.PHYSICAL_TYPE_UINT64
}
// EncodingType returns [datasetmd.ENCODING_TYPE_BITMAP].
func (dec *bitmapDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_BITMAP
}
// Decode decodes up to len(s) values, storing the results into s. The
// number of decoded values is returned, followed by an error (if any).
// At the end of the stream, Decode returns 0, [io.EOF].
func (dec *bitmapDecoder) Decode(s []Value) (int, error) {
if len(s) == 0 {
return 0, nil
}
var err error
var v Value
for i := range s {
v, err = dec.decode()
if errors.Is(err, io.EOF) {
if i == 0 {
return 0, io.EOF
}
return i, nil
} else if err != nil {
return i, err
}
s[i] = v
}
return len(s), nil
}
// decode reads the next uint64 value from the stream.
func (dec *bitmapDecoder) decode() (Value, error) {
// See comment inside [bitmapDecoder] for the state machine details.
NextState:
switch {
case dec.runLength == 0 && dec.sets == 0 && dec.setSize == 0: // READY
if err := dec.readHeader(); err != nil {
return Uint64Value(0), fmt.Errorf("reading header: %w", err)
}
goto NextState
case dec.runLength > 0: // RLE
dec.runLength--
return Uint64Value(dec.runValue), nil
case dec.sets > 0 && dec.setSize == 0: // BITPACK-READY
if err := dec.nextBitpackSet(); err != nil {
return Uint64Value(0), fmt.Errorf("reading bitpacked set: %w", err)
}
goto NextState
case dec.setSize > 0: // BITPACK-SET
elem := 8 - dec.setSize
var val uint64
for b := 0; b < dec.setWidth; b++ {
// Read bit b of element index i, where i is byte i*8/width.
i := (int(elem)*dec.setWidth + b) / 8
offset := (int(elem)*dec.setWidth + b) % 8
bitValue := dec.set[i] & (1 << offset) >> offset
val |= uint64(bitValue) << b
}
dec.setSize--
return Uint64Value(val), nil
default:
panic("dataset.bitmapDecoder: invalid state")
}
}
// readHeader reads the next header from the stream
func (dec *bitmapDecoder) readHeader() error {
// Ready the next uvarint.
header, err := streamio.ReadUvarint(dec.r)
if err != nil {
return err
}
if header&1 == 1 {
// Start of a bitpacked set.
dec.sets = int(header >> 7)
dec.setWidth = int((header>>1)&0x3f) + 1
dec.setSize = 0 // Sets will be loaded in [bitmapDecoder.nextBitpackSet].
dec.set = make([]byte, dec.setWidth)
} else {
// RLE run.
runLength := header >> 1
val, err := streamio.ReadUvarint(dec.r)
if err != nil {
return err
}
dec.runLength = runLength
dec.runValue = val
}
return nil
}
// nextBitpackSet loads the next bitpack set and decrements the sets counter.
func (dec *bitmapDecoder) nextBitpackSet() error {
if dec.sets == 0 {
return fmt.Errorf("no bitpacked sets left")
}
// dec.set is allocated in [bitmapDecoder.readHeader].
if _, err := io.ReadFull(dec.r, dec.set); err != nil {
return err
}
dec.setSize = 8 // Always 8 elements in each set.
dec.sets--
return nil
}
// Reset resets dec to read from r.
func (dec *bitmapDecoder) Reset(r streamio.Reader) {
dec.r = r
dec.runValue = 0
dec.runLength = 0
dec.sets = 0
dec.setWidth = 0
dec.setSize = 0
dec.set = nil
}