Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/internal/dataset/value_encoding_bitmap.go

694 lines
20 KiB

package dataset
import (
"encoding/binary"
"fmt"
"io"
"math/bits"
"github.com/grafana/loki/v3/pkg/columnar"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/memory"
)
const maxRunLength uint64 = 1<<63 - 1 // 2^63-1
// bitmapEncoder encodes and decodes bitmaps of unsigned numbers up to 64 bits
// wide. To use bitmap with signed integers, callers should first encode the
// integers using zig-zag encoding to minimize the number of bits needed for
// negative values.
//
// Data is encoded with a hybrid of run-length encoding and bitpacking. Longer
// sequences of the same value are encoded with run-length encoding, while
// shorter sequences are bitpacked when possible. To avoid padding, bitpacking
// is only used when there are a multiple of 8 values to encode.
//
// Bitpacking is done using a dynamic bit width. The bit width is determined by
// the largest value in the set of 8 values. The bit width can be any value
// from 1 to 64, inclusive.
//
// # Format
//
// The bitmap format is a slight modification of the Parquet format to support
// longer runs and support streaming. The EBNF grammar is as follows:
//
// bitmap = run+;
// run = bit_packed_run | rle_run;
// bit_packed_run = bit_packed_header bit_packed_values;
// bit_packed_header = (* uvarint(bit_packed_sets << 7 | bit_width << 1 | 1) *)
// bit_packed_sets = (* value between 1 and 2^57-1, inclusive; each set has 8 elements *)
// bit_width = (* bit size of element in set; value between 1 and 64, inclusive *)
// bit_packed_values = (* least significant bit of each byte to most significant bit of each byte *)
// rle_run = rle_header repeated_value;
// rle_header = (* uvarint(rle_run_len << 1) *)
// rle_run_len = (* value between 1 and 2^63-1, inclusive *)
// repeated_value = (* repeated value encoded as uvarint *)
//
// Where this differs from Parquet:
//
// - We don't use a fixed width for bitpacked values, to allow for streaming
// calls to Encode without knowing the width in advance. Instead, the width
// is determined when flushing a bitpacked set to an internal buffer.
//
// To minimize the overhead of encoding the width dynamically, we store
// each bitpacked set with the smallest amount of bits possible. If two
// sets have different widths, they are flushed as two different runs.
//
// - For simplicity, repeated_value is encoded as uvarint rather than
// flushing the value in its entirety.
//
// - To facilitate streaming, we don't prepend the length of all bytes
// written. Callers may choose to prepend the length. Without the length,
// readers must take caution to not read past the end of the RLE sequence
// by knowing exactly how many values were encoded.
type bitmapEncoder struct {
w streamio.Writer
// bitmapEncoder is a basic state machine with three states:
//
// READY The default state; no values are being tracked. Appending a new
// value moves to the RLE state.
//
// RLE bitmapEncoder is tracking a run of runValue. Active when
// runLength>0. If a value not matching runValue is appended and
// runLength<8, the state moves to BITPACK. Otherwise, the previous
// run is flushed and a new RLE run is started.
//
// BITPACK bitmapEncoder is tracking a set of values to bitpack. Active when
// setSize>0. Once the set reaches 8 values, the set is flushed and
// the encoder resets to READY.
runValue uint64 // Value in the current run.
runLength uint64 // Length of the current run.
set [8]uint64 // Set of values to bitpack.
setSize byte // Current number of elements in set.
buf *bitpackBuffer // Buffer for multiple runs of bitpacked sets of the same bit width.
}
// newBitmapEncoder creates a new bitmap encoder that writes encoded numbers to w.
func newBitmapEncoder(w streamio.Writer) *bitmapEncoder {
return &bitmapEncoder{
w: w,
buf: newBitpackBuffer(),
}
}
// PhysicalType returns [datasetmd.PHYSICAL_TYPE_UINT64].
func (enc *bitmapEncoder) PhysicalType() datasetmd.PhysicalType {
return datasetmd.PHYSICAL_TYPE_UINT64
}
// EncodingType returns [datasetmd.ENCODING_TYPE_BITMAP].
func (enc *bitmapEncoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_BITMAP
}
// Encode appends a new uint64 value to enc.
//
// When flushing, Encode returns an error if writing to the underlying
// [streamio.Writer] fails.
//
// Call [bitmapEncoder.Flush] to end the current run and flush any remaining
// values.
func (enc *bitmapEncoder) Encode(v Value) error {
if v.Type() != datasetmd.PHYSICAL_TYPE_UINT64 {
return fmt.Errorf("invalid value type %s", v.Type())
}
uv := v.Uint64()
if uv != 0 && uv != 1 {
// Both decoder and encoder specialize on boolean values for now as this encoding is used only for presence
// bitmap at the moment. We can add support for larger values later if required.
return fmt.Errorf("invalid value %d of %s", uv, v.Type())
}
switch {
case enc.runLength == 0 && enc.setSize == 0: // READY; start a new run.
enc.runValue = uv
enc.runLength = 1
return nil
case enc.runLength > 0 && uv == enc.runValue: // RLE with matching value; continue the run.
enc.runLength++
// If we hit the maximum run length, flush immediately.
if enc.runLength == maxRunLength {
return enc.flushRLE()
}
return nil
case enc.runLength > 0 && uv != enc.runValue: // RLE with different value; the run ended.
// If the run lasted less than 8 values, we switch to the BITPACK state.
if enc.runLength < 8 {
enc.switchToBitpack()
enc.set[enc.setSize] = uv
enc.setSize++
if enc.setSize == 8 {
return enc.flushBitpacked()
}
return nil
}
// Otherwise, flush the previous run and start a new one.
if err := enc.flushRLE(); err != nil {
return err
}
enc.runValue = uv
enc.runLength = 1
return nil
case enc.setSize > 0: // BITPACK; add the value to the set.
enc.set[enc.setSize] = uv
enc.setSize++
if enc.setSize == 8 {
return enc.flushBitpacked()
}
return nil
default:
panic("dataset.bitmapEncoder: invalid state")
}
}
func (enc *bitmapEncoder) EncodeN(v Value, n uint64) error {
if v.Type() != datasetmd.PHYSICAL_TYPE_UINT64 {
return fmt.Errorf("invalid value type %s", v.Type())
}
uv := v.Uint64()
if uv != 0 && uv != 1 {
// Both decoder and encoder specialize on boolean values for now as this encoding is used only for presence
// bitmap at the moment. We can add support for larger values later if required.
return fmt.Errorf("invalid value %d of %s", uv, v.Type())
}
switch {
case enc.runLength == 0 && enc.setSize == 0: // Ready; start a new run.
enc.runValue = uv
fallthrough
case enc.runLength > 0 && uv == enc.runValue: // RLE with matching value; continue the run.
if enc.runLength+n >= maxRunLength {
n -= (maxRunLength - enc.runLength) // remaining count
enc.runLength = maxRunLength
if err := enc.flushRLE(); err != nil {
return err
}
if n > 0 {
return enc.EncodeN(v, n)
}
} else {
enc.runLength += n
}
return nil
case enc.runLength > 0 && uv != enc.runValue: // RLE with different value; the run ended.
if enc.runLength >= 8 {
if err := enc.flushRLE(); err != nil {
return err
}
return enc.EncodeN(v, n)
}
enc.switchToBitpack()
fallthrough
case enc.setSize > 0:
// Fill up remaining slots with the new value
for n > 0 && enc.setSize < 8 {
enc.set[enc.setSize] = uv
enc.setSize++
n--
}
if enc.setSize == 8 {
if err := enc.flushBitpacked(); err != nil {
return err
}
}
if n > 0 {
return enc.EncodeN(v, n)
}
return nil
default:
panic("dataset.bitmapEncoder: invalid state")
}
}
func (enc *bitmapEncoder) switchToBitpack() {
// Copy over the existing run to the set and then add the new value.
enc.setSize = byte(enc.runLength)
for i := byte(0); i < enc.setSize; i++ {
enc.set[i] = enc.runValue
}
enc.runLength = 0
}
// Flush writes any remaining values to the underlying [streamio.Writer].
func (enc *bitmapEncoder) Flush() error {
// We always flush using RLE. If bitmapEncoder was in the BITPACK state, we
// don't have 8 values yet; if we did, they would've already been flushed in
// Encode.
return enc.flushRLE()
}
func (enc *bitmapEncoder) flushRLE() error {
// Flush anything in the bitpack buffer.
if err := enc.buf.Flush(enc.w); err != nil {
return err
}
switch {
case enc.runLength > 0:
if enc.runLength > maxRunLength {
return fmt.Errorf("run length too large")
}
// Header.
if err := streamio.WriteUvarint(enc.w, enc.runLength<<1); err != nil {
return err
}
// Value.
if err := streamio.WriteUvarint(enc.w, enc.runValue); err != nil {
return err
}
enc.runLength = 0
enc.runValue = 0
return nil
case enc.setSize > 0:
// Cosnume the set as a sequence of runs.
for off := byte(0); off < enc.setSize; {
var (
val = enc.set[off]
run = byte(1)
)
for j := off + 1; j < enc.setSize; j++ {
if enc.set[j] != val {
break
}
run++
}
off += run
// Header.
if err := streamio.WriteUvarint(enc.w, uint64(run<<1)); err != nil {
return err
}
// Value.
if err := streamio.WriteUvarint(enc.w, val); err != nil {
return err
}
}
enc.setSize = 0
return nil
default:
return nil
}
}
// flushBitpacked flushes the current bitpacked buffer for accumulating runs.
// If the buffer is full, we flush the buffer immediately to the underlying
// writer.
func (enc *bitmapEncoder) flushBitpacked() error {
if enc.setSize != 8 {
panic("dataset.bitmapEncoder: flushBitpacked called with less than 8 values")
}
// Detect the bit width of the set.
width := 1
for i := 0; i < int(enc.setSize); i++ {
if bitLength := bits.Len64(enc.set[i]); bitLength > width {
width = bitLength
}
}
// Write out the bitpacked values. Bitpacking 8 values of bit width N always
// requires exactly N bytes.
//
// Each value is packed from the least significant bit of each byte to the
// most significant bit, while still retaining the order of bits from the
// original value.
//
// This bitpacking algorithm is challenging to reason about, but is retained
// to align with Parquet's behaviour. I (rfratto) found it easier to reason
// by considering how the output bits map to the input bits.
//
// This means that for width == 3:
//
// index: 0 1 2 3 4 5 6 7
// dec value: 0 1 2 3 4 5 6 7
// bit value: 000 001 010 011 100 101 110 111
// bit label: ABC DEF GHI JKL MNO PQR STU VWX
//
// index: 22111000 54443332 77766655
// bit value: 10001000 11000110 11111010
// bit label: HIDEFABC RMNOJKLG VWXSTUPQ
//
// Formatting it as a table better demonstrates the mapping:
//
// Index Labels Input bit Output byte Output bit (for byte)
// ----- ----- --------- ----------- ---------------------
// 0 ABC 2 1 0 0 0 0 2 1 0
// 1 DEF 5 4 3 0 0 0 5 4 3
// 2 GHI 8 7 6 1 0 0 0 7 6
// 3 JKL 11 10 9 1 1 1 3 2 1
// 4 MNO 14 13 12 1 1 1 6 5 4
// 5 PQR 17 16 15 2 2 1 1 0 7
// 6 STU 20 19 18 2 2 2 4 3 2
// 7 VWX 23 22 21 2 2 2 7 6 5
//
// So, for any given output bit, its value originates from:
//
// * enc.set index: output_bit/width
// * enc.set element bit: output_bit%width
//
// If there's a much simpler way to understand and do this packing, I'd love
// to know.
buf := make([]byte, 0, width)
for outputByte := 0; outputByte < width; outputByte++ {
var b byte
for i := 0; i < 8; i++ {
outputBit := outputByte*8 + i
inputIndex := outputBit / width
inputBit := outputBit % width
// Set the bit in b.
if enc.set[inputIndex]&(1<<inputBit) != 0 {
b |= 1 << i
}
}
buf = append(buf, b)
}
// Append the set to our buffer. It only returns an error in two cases:
//
// 1. The width changed, or
// 2. the buffer is full.
//
// In either case, we want to flush and try again.
for range 2 {
if err := enc.buf.AppendSet(width, buf); err == nil {
break
}
if err := enc.buf.Flush(enc.w); err != nil {
return err
}
}
enc.setSize = 0
return nil
}
// Reset resets enc to write to w.
func (enc *bitmapEncoder) Reset(w streamio.Writer) {
enc.w = w
enc.runValue = 0
enc.runLength = 0
enc.setSize = 0
enc.buf.Reset()
}
type bitpackBuffer struct {
maxBufferSize int
width int
sets uint64 // Number of encoded sets. Each set has 8 elements.
data []byte // Total amount of data.
}
func newBitpackBuffer() *bitpackBuffer {
return &bitpackBuffer{
maxBufferSize: 4096, // 4KiB
}
}
// AppendSet appends a bitpacked set of 8 elements to the buffer. AppendSet
// fails if the buffer is too large or if the width changed.
func (b *bitpackBuffer) AppendSet(width int, data []byte) error {
if width < 1 || width > 64 {
return fmt.Errorf("invalid width: %d", width)
}
// Error conditions
switch {
case len(b.data)+len(data) > b.maxBufferSize && b.sets > 0:
return fmt.Errorf("buffer full")
case b.width != width && b.sets > 0:
return fmt.Errorf("width changed")
}
b.sets++
b.width = width
b.data = append(b.data, data...)
return nil
}
// Flush flushes buffered data to w and resets state for more writes.
func (b *bitpackBuffer) Flush(w streamio.Writer) error {
if b.sets == 0 {
return nil
}
// The header of the bitpacked sequence encodes:
//
// * The number of sets
// * The width of elements in the sets (1-64)
// * A flag indicating the header type
//
// To encode the width in 6 bits, we encode width-1. That reserves the bottom
// 7 bits for metadata, and the remaining 57 bits for the number of sets.
const maxSets uint64 = 1<<57 - 1
// Validate constraints for safety.
switch {
case b.width < 1 || b.width > 64:
return fmt.Errorf("invalid width: %d", b.width)
case b.sets > maxSets:
// This shouldn't ever happen; 2^57-1 sets, in the best case (width of 1),
// would require our buffer to be 144PB.
return fmt.Errorf("too many sets: %d", b.sets)
}
// Width can be between 1 and 64. To pack it into 6 bits, we subtract 1 from
// the value.
header := (b.sets << 7) | (uint64(b.width-1) << 1) | 1
if err := streamio.WriteUvarint(w, header); err != nil {
return err
}
if n, err := w.Write(b.data); err != nil {
return err
} else if n != len(b.data) {
return fmt.Errorf("short write: %d != %d", n, len(b.data))
}
b.width = 0
b.sets = 0
b.data = b.data[:0]
return nil
}
// Reset resets the buffer to its initial state without flushing.
func (b *bitpackBuffer) Reset() {
b.width = 0
b.sets = 0
b.data = b.data[:0]
}
// bitmapDecoder decodes boolean presence values from a bitmap-encoded byte
// slice. It assumes values are encoded as 0/1 and that bitpacked runs use a
// width of 1 (one byte holds 8 values, LSB-first).
type bitmapDecoder struct {
data []byte
// Like [bitmapEncoder], bitmapDecoder is a basic state machine with four
// states:
//
// READY The default state; the decoder needs to pull a new run
// header and move to RLE or BITPACK-READY.
//
// RLE The decoder is in the middle of an RLE-encoded run. Active
// when runLength>0. runLength should decrease by 1 each time
// Decode is called, returning runValue.
//
// BITPACK-READY The decoder is ready to read a new bitpacked set. Active
// when sets>0 and setSize==0. The decoder needs to pull the
// next set (1 byte), update setSize and decrement sets by 1.
//
// BITPACK-SET The decoder is in the middle of a bitpacked set. Active
// when setSize>0. setSize decreases by 1 each time Decode is
// called, and the next bitpacked value in the set is
// returned.
//
// bitmapDecoder always starts in the READY state, and the header it pulls
// determines whether it moves to RLE or BITPACK-READY. After fully consuming
// a run, it reverts back to READY.
off int // Byte offset into data.
runValue bool // Value of the current RLE run.
runLength uint64 // Remaining values in the current RLE run.
sets int // Number of bitpacked sets left to read, each of which contains 8 elements.
setSize byte // Number of values left in the current bitpacked set.
set byte // Current bitpacked set byte (8 values, LSB-first).
}
var _ valueDecoder = (*bitmapDecoder)(nil)
// newBitmapDecoder creates a new bitmap decoder that reads encoded bools from data.
func newBitmapDecoder(data []byte) *bitmapDecoder {
return &bitmapDecoder{data: data}
}
// PhysicalType returns [datasetmd.PHYSICAL_TYPE_UINT64].
func (dec *bitmapDecoder) PhysicalType() datasetmd.PhysicalType {
return datasetmd.PHYSICAL_TYPE_UINT64
}
// EncodingType returns [datasetmd.ENCODING_TYPE_BITMAP].
func (dec *bitmapDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_BITMAP
}
// Decode decodes up to count values and returns them as a bitmap. The number
// of decoded values is bm.Len(). At the end of the stream, Decode returns
// any decoded values along with [io.EOF].
func (dec *bitmapDecoder) Decode(alloc *memory.Allocator, count int) (columnar.Array, error) {
bm := memory.MakeBitmap(alloc, count)
bm.Grow(count)
err := dec.DecodeTo(&bm, count)
return columnar.MakeBool(bm, memory.Bitmap{}), err
}
func (dec *bitmapDecoder) DecodeTo(bm *memory.Bitmap, count int) error {
var (
runLength = dec.runLength
sets = dec.sets
setSize = dec.setSize
off = dec.off
data = dec.data
set = dec.set
runValue = dec.runValue
leftToDecode = uint64(count)
)
defer func() {
dec.runLength = runLength
dec.sets = sets
dec.setSize = setSize
dec.off = off
dec.data = data
dec.set = set
dec.runValue = runValue
}()
bm.Grow(count)
bm.Resize(0)
for leftToDecode > 0 {
switch {
case runLength == 0 && sets == 0 && setSize == 0: // READY
if off >= len(data) {
return io.EOF
}
header, uvarintSize := binary.Uvarint(data[off:])
if uvarintSize <= 0 {
return io.EOF
}
off += uvarintSize
if header&1 == 1 {
// Start of a bitpacked set.
sets = int(header >> 7)
setSize = 0 // Sets will be loaded in [bitmapDecoder.nextBitpackSet].
set = 0
setWidth := int((header>>1)&0x3f) + 1
// only support bool values encoded as ints
if setWidth != 1 {
return fmt.Errorf("set width is supposed to be 1, got %d", setWidth)
}
} else {
// RLE run.
runLength = header >> 1
if off >= len(data) {
return io.EOF
}
val, uvarintSize := binary.Uvarint(data[off:])
if uvarintSize <= 0 {
return io.EOF
}
off += uvarintSize
// only support bool values encoded as ints
if val != 0 && val != 1 {
return fmt.Errorf("unsupported RLE value %d", val)
}
runValue = val > 0
}
continue
case runLength > 0: // RLE
decoded := min(runLength, leftToDecode)
bm.AppendCountUnsafe(runValue, int(decoded))
runLength -= decoded
leftToDecode -= decoded
continue
case sets > 0 && setSize == 0: // BITPACK-READY
if off >= len(dec.data) {
return io.EOF
}
set = data[off]
off++
setSize = 8 // Always 8 elements in each set.
sets--
continue
case setSize > 0: // BITPACK-SET
// we expect setWidth to be 1 because we currently support only boolean values encode as ints (0/1).
elem := 8 - setSize
bm.AppendUnsafe((set>>elem)&1 == 1)
setSize--
leftToDecode--
default:
panic("dataset.bitmapDecoder: invalid state")
}
}
return nil
}
// Reset resets dec to read from data.
func (dec *bitmapDecoder) Reset(data []byte) {
dec.data = data
dec.runValue = false
dec.runLength = 0
dec.sets = 0
dec.setSize = 0
dec.set = 0
dec.off = 0
}