chore(dataobj): Decode values in batches (#16418)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/16410/head
Christian Haudum 10 months ago committed by GitHub
parent c92e339bf1
commit f6fcc1194e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 53
      pkg/dataobj/internal/dataset/page_iter.go
  2. 48
      pkg/dataobj/internal/dataset/page_test.go
  3. 7
      pkg/dataobj/internal/dataset/value_encoding.go
  4. 31
      pkg/dataobj/internal/dataset/value_encoding_bitmap.go
  5. 72
      pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go
  6. 32
      pkg/dataobj/internal/dataset/value_encoding_delta.go
  7. 40
      pkg/dataobj/internal/dataset/value_encoding_delta_test.go
  8. 31
      pkg/dataobj/internal/dataset/value_encoding_plain.go
  9. 33
      pkg/dataobj/internal/dataset/value_encoding_plain_test.go

@ -10,6 +10,8 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
const decodeBufferSize = 128
func iterMemPage(p *MemPage, valueType datasetmd.ValueType, compressionType datasetmd.CompressionType) result.Seq[Value] {
return result.Iter(func(yield func(Value) bool) error {
presenceReader, valuesReader, err := p.reader(compressionType)
@ -24,25 +26,50 @@ func iterMemPage(p *MemPage, valueType datasetmd.ValueType, compressionType data
return fmt.Errorf("no decoder available for %s/%s", valueType, p.Info.Encoding)
}
var iB, iV = decodeBufferSize, decodeBufferSize // current index of buffers for precence values and values
var nB, nV = decodeBufferSize, decodeBufferSize // size of buffers for precence values and values
bufB, bufV := make([]Value, decodeBufferSize), make([]Value, decodeBufferSize) // buffers for precence values and values
for {
var value Value
present, err := presenceDec.Decode()
if errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return fmt.Errorf("decoding presence bitmap: %w", err)
} else if present.Type() != datasetmd.VALUE_TYPE_UINT64 {
return fmt.Errorf("unexpected presence type %s", present.Type())
// There are not decoded presence values in the buffer, so read a new
// batch up to decodeBufferSize values and reset the current index of the
// value inside the buffer.
if iB >= nB-1 {
nB, err = presenceDec.Decode(bufB[:decodeBufferSize])
bufB = bufB[:nB]
iB = 0
if nB == 0 || errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return fmt.Errorf("decoding presence bitmap: %w", err)
}
} else {
iB++
}
// value is currently nil. If the presence bitmap says our row has a
// value, we decode it into value.
if present.Uint64() == 1 {
value, err = valuesDec.Decode()
if err != nil {
return fmt.Errorf("decoding value: %w", err)
if bufB[iB].Uint64() == 0 { //nolint:revive
// If the presence bitmap says our row has no value, we need to yield a
// nil value, so do nothing.
} else if bufB[iB].Uint64() == 1 {
// If the presence bitmap says our row has a value, we decode it.
// There are no decoded values in the buffer, so read a new batch up to
// decodeBufferSize values and reset the current index of the value inside the buffer.
if iV >= nV-1 {
nV, err = valuesDec.Decode(bufV[:decodeBufferSize])
bufV = bufV[:nV]
iV = 0
if nV == 0 || errors.Is(err, io.EOF) {
return fmt.Errorf("too few values to decode: expected at least %d, got %d", len(bufB), nV)
} else if err != nil {
return fmt.Errorf("decoding value: %w", err)
}
} else {
iV++
}
value = bufV[iV]
}
if !yield(value) {

@ -11,6 +11,54 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)
func Benchmark_page_Decode(b *testing.B) {
in := []string{
"hello, world!",
"",
"this is a test of the emergency broadcast system",
"this is only a test",
"if this were a real emergency, you would be instructed to panic",
"but it's not, so don't",
"",
"this concludes the test",
"thank you for your cooperation",
"goodbye",
}
opts := BuilderOptions{
PageSizeHint: 1 << 30, // 1GiB
Value: datasetmd.VALUE_TYPE_STRING,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
builder, err := newPageBuilder(opts)
require.NoError(b, err)
for i := range 1_000_000 {
s := in[i%len(in)]
require.True(b, builder.Append(StringValue(s)))
}
page, err := builder.Flush()
require.NoError(b, err)
b.ReportAllocs()
b.ResetTimer()
for range b.N {
_, values, err := page.reader(datasetmd.COMPRESSION_TYPE_ZSTD)
if err != nil {
b.Fatal()
}
if _, err := io.Copy(io.Discard, values); err != nil {
b.Fatal(err)
} else if err := values.Close(); err != nil {
b.Fatal(err)
}
}
}
func Benchmark_pageBuilder_WriteRead(b *testing.B) {
in := []string{
"hello, world!",

@ -40,9 +40,10 @@ type valueDecoder interface {
// EncodingType returns the encoding type used by the valueDecoder.
EncodingType() datasetmd.EncodingType
// Decode decodes an individual [Value]. Decode returns an error if decoding
// fails.
Decode() (Value, error)
// Decode decodes up to len(s) values, storing the results into s. The
// number of decoded values is returned, followed by an error (if any).
// At the end of the stream, Decode returns 0, [io.EOF].
Decode(s []Value) (int, error)
// Reset discards any state and resets the valueDecoder to read from r. This
// permits reusing a valueDecoder rather than allocating a new one.

@ -1,6 +1,7 @@
package dataset
import (
"errors"
"fmt"
"io"
"math/bits"
@ -500,8 +501,34 @@ func (dec *bitmapDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_BITMAP
}
// Decode reads the next uint64 value from the stream.
func (dec *bitmapDecoder) Decode() (Value, error) {
// Decode decodes up to len(s) values, storing the results into s. The
// number of decoded values is returned, followed by an error (if any).
// At the end of the stream, Decode returns 0, [io.EOF].
func (dec *bitmapDecoder) Decode(s []Value) (int, error) {
if len(s) == 0 {
return 0, nil
}
var err error
var v Value
for i := range s {
v, err = dec.decode()
if errors.Is(err, io.EOF) {
if i == 0 {
return 0, io.EOF
}
return i, nil
} else if err != nil {
return i, err
}
s[i] = v
}
return len(s), nil
}
// decode reads the next uint64 value from the stream.
func (dec *bitmapDecoder) decode() (Value, error) {
// See comment inside [bitmapDecoder] for the state machine details.
NextState:

@ -2,6 +2,8 @@ package dataset
import (
"bytes"
"errors"
"io"
"math"
"math/rand"
"testing"
@ -13,8 +15,9 @@ func Test_bitmap(t *testing.T) {
var buf bytes.Buffer
var (
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
decBuf = make([]Value, batchSize)
)
count := 1500
@ -25,10 +28,15 @@ func Test_bitmap(t *testing.T) {
t.Logf("Buffer size: %d", buf.Len())
for range count {
v, err := dec.Decode()
for {
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
require.Equal(t, uint64(1), v.Uint64())
for _, v := range decBuf[:n] {
require.Equal(t, uint64(1), v.Uint64())
}
}
}
@ -36,8 +44,9 @@ func Test_bitmap_bitpacking(t *testing.T) {
var buf bytes.Buffer
var (
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
decBuf = make([]Value, batchSize)
)
expect := []uint64{0, 1, 2, 3, 4, 5, 6, 7}
@ -47,10 +56,15 @@ func Test_bitmap_bitpacking(t *testing.T) {
require.NoError(t, enc.Flush())
var actual []uint64
for range len(expect) {
v, err := dec.Decode()
for {
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
actual = append(actual, v.Uint64())
for _, v := range decBuf[:n] {
actual = append(actual, v.Uint64())
}
}
require.NoError(t, enc.Flush())
@ -61,8 +75,9 @@ func Test_bitmap_bitpacking_partial(t *testing.T) {
var buf bytes.Buffer
var (
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
decBuf = make([]Value, batchSize)
)
expect := []uint64{0, 1, 2, 3, 4}
@ -72,10 +87,15 @@ func Test_bitmap_bitpacking_partial(t *testing.T) {
require.NoError(t, enc.Flush())
var actual []uint64
for range len(expect) {
v, err := dec.Decode()
for {
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
actual = append(actual, v.Uint64())
for _, v := range decBuf[:n] {
actual = append(actual, v.Uint64())
}
}
require.Equal(t, expect, actual)
@ -99,8 +119,9 @@ func Fuzz_bitmap(f *testing.F) {
var buf bytes.Buffer
var (
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
enc = newBitmapEncoder(&buf)
dec = newBitmapDecoder(&buf)
decBuf = make([]Value, batchSize)
)
var numbers []uint64
@ -117,10 +138,15 @@ func Fuzz_bitmap(f *testing.F) {
require.NoError(t, enc.Flush())
var actual []uint64
for i := 0; i < count; i++ {
v, err := dec.Decode()
for {
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
actual = append(actual, v.Uint64())
for _, v := range decBuf[:n] {
actual = append(actual, v.Uint64())
}
}
require.Equal(t, numbers, actual)
@ -205,7 +231,7 @@ func benchmarkBitmapDecoder(b *testing.B, width int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = dec.Decode()
_, _ = dec.decode()
}
})
@ -224,7 +250,7 @@ func benchmarkBitmapDecoder(b *testing.B, width int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = dec.Decode()
_, _ = dec.decode()
}
})
@ -244,7 +270,7 @@ func benchmarkBitmapDecoder(b *testing.B, width int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = dec.Decode()
_, _ = dec.decode()
}
})
}

@ -1,7 +1,9 @@
package dataset
import (
"errors"
"fmt"
"io"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
@ -92,8 +94,34 @@ func (dec *deltaDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_DELTA
}
// Decode decodes the next value.
func (dec *deltaDecoder) Decode() (Value, error) {
// Decode decodes up to len(s) values, storing the results into s. The
// number of decoded values is returned, followed by an error (if any).
// At the end of the stream, Decode returns 0, [io.EOF].
func (dec *deltaDecoder) Decode(s []Value) (int, error) {
if len(s) == 0 {
return 0, nil
}
var err error
var v Value
for i := range s {
v, err = dec.decode()
if errors.Is(err, io.EOF) {
if i == 0 {
return 0, io.EOF
}
return i, nil
} else if err != nil {
return i, err
}
s[i] = v
}
return len(s), nil
}
// decode reads the next uint64 value from the stream.
func (dec *deltaDecoder) decode() (Value, error) {
delta, err := streamio.ReadVarint(dec.r)
if err != nil {
return Int64Value(dec.prev), err

@ -2,6 +2,8 @@ package dataset
import (
"bytes"
"errors"
"io"
"math"
"math/rand"
"testing"
@ -22,8 +24,9 @@ func Test_delta(t *testing.T) {
var buf bytes.Buffer
var (
enc = newDeltaEncoder(&buf)
dec = newDeltaDecoder(&buf)
enc = newDeltaEncoder(&buf)
dec = newDeltaDecoder(&buf)
decBuf = make([]Value, batchSize)
)
for _, num := range numbers {
@ -31,10 +34,15 @@ func Test_delta(t *testing.T) {
}
var actual []int64
for range len(numbers) {
v, err := dec.Decode()
for {
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
actual = append(actual, v.Int64())
for _, v := range decBuf[:n] {
actual = append(actual, v.Int64())
}
}
require.Equal(t, numbers, actual)
@ -54,8 +62,9 @@ func Fuzz_delta(f *testing.F) {
var buf bytes.Buffer
var (
enc = newDeltaEncoder(&buf)
dec = newDeltaDecoder(&buf)
enc = newDeltaEncoder(&buf)
dec = newDeltaDecoder(&buf)
decBuf = make([]Value, batchSize)
)
var numbers []int64
@ -66,10 +75,15 @@ func Fuzz_delta(f *testing.F) {
}
var actual []int64
for i := 0; i < count; i++ {
v, err := dec.Decode()
for {
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
actual = append(actual, v.Int64())
for _, v := range decBuf[:n] {
actual = append(actual, v.Int64())
}
}
require.Equal(t, numbers, actual)
@ -126,7 +140,7 @@ func Benchmark_deltaDecoder_Decode(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = dec.Decode()
_, _ = dec.decode()
}
})
@ -148,7 +162,7 @@ func Benchmark_deltaDecoder_Decode(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = dec.Decode()
_, _ = dec.decode()
}
})
@ -168,7 +182,7 @@ func Benchmark_deltaDecoder_Decode(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = dec.Decode()
_, _ = dec.decode()
}
})
}

@ -2,6 +2,7 @@ package dataset
import (
"encoding/binary"
"errors"
"fmt"
"io"
"unsafe"
@ -95,8 +96,34 @@ func (dec *plainDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_PLAIN
}
// Decode decodes a string.
func (dec *plainDecoder) Decode() (Value, error) {
// Decode decodes up to len(s) values, storing the results into s. The
// number of decoded values is returned, followed by an error (if any).
// At the end of the stream, Decode returns 0, [io.EOF].
func (dec *plainDecoder) Decode(s []Value) (int, error) {
if len(s) == 0 {
return 0, nil
}
var err error
var v Value
for i := range s {
v, err = dec.decode()
if errors.Is(err, io.EOF) {
if i == 0 {
return 0, io.EOF
}
return i, nil
} else if err != nil {
return i, err
}
s[i] = v
}
return len(s), nil
}
// decode decodes a string.
func (dec *plainDecoder) decode() (Value, error) {
sz, err := binary.ReadUvarint(dec.r)
if err != nil {
return StringValue(""), err

@ -19,12 +19,15 @@ var testStrings = []string{
"baz",
}
var batchSize = 64
func Test_plainEncoder(t *testing.T) {
var buf bytes.Buffer
var (
enc = newPlainEncoder(&buf)
dec = newPlainDecoder(&buf)
enc = newPlainEncoder(&buf)
dec = newPlainDecoder(&buf)
decBuf = make([]Value, batchSize)
)
for _, v := range testStrings {
@ -34,13 +37,15 @@ func Test_plainEncoder(t *testing.T) {
var out []string
for {
str, err := dec.Decode()
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
} else if err != nil {
t.Fatal(err)
}
out = append(out, str.String())
for _, v := range decBuf[:n] {
out = append(out, v.String())
}
}
require.Equal(t, testStrings, out)
@ -50,8 +55,9 @@ func Test_plainEncoder_partialRead(t *testing.T) {
var buf bytes.Buffer
var (
enc = newPlainEncoder(&buf)
dec = newPlainDecoder(&oneByteReader{&buf})
enc = newPlainEncoder(&buf)
dec = newPlainDecoder(&oneByteReader{&buf})
decBuf = make([]Value, batchSize)
)
for _, v := range testStrings {
@ -61,13 +67,15 @@ func Test_plainEncoder_partialRead(t *testing.T) {
var out []string
for {
str, err := dec.Decode()
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
} else if err != nil {
t.Fatal(err)
}
out = append(out, str.String())
for _, v := range decBuf[:n] {
out = append(out, v.String())
}
}
require.Equal(t, testStrings, out)
@ -87,19 +95,20 @@ func Benchmark_plainDecoder_Decode(b *testing.B) {
buf := bytes.NewBuffer(make([]byte, 0, 1024)) // Large enough to avoid reallocations.
var (
enc = newPlainEncoder(buf)
dec = newPlainDecoder(buf)
enc = newPlainEncoder(buf)
dec = newPlainDecoder(buf)
decBuf = make([]Value, batchSize)
)
for _, v := range testStrings {
require.NoError(b, enc.Encode(StringValue(v)))
}
var err error
b.ResetTimer()
for i := 0; i < b.N; i++ {
for {
var err error
_, err = dec.Decode()
_, err = dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
} else if err != nil {

Loading…
Cancel
Save