diff --git a/pkg/dataobj/internal/dataset/page_iter.go b/pkg/dataobj/internal/dataset/page_iter.go index 442e104135..65aa0deb98 100644 --- a/pkg/dataobj/internal/dataset/page_iter.go +++ b/pkg/dataobj/internal/dataset/page_iter.go @@ -10,6 +10,8 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) +const decodeBufferSize = 128 + func iterMemPage(p *MemPage, valueType datasetmd.ValueType, compressionType datasetmd.CompressionType) result.Seq[Value] { return result.Iter(func(yield func(Value) bool) error { presenceReader, valuesReader, err := p.reader(compressionType) @@ -24,25 +26,50 @@ func iterMemPage(p *MemPage, valueType datasetmd.ValueType, compressionType data return fmt.Errorf("no decoder available for %s/%s", valueType, p.Info.Encoding) } + var iB, iV = decodeBufferSize, decodeBufferSize // current index of buffers for precence values and values + var nB, nV = decodeBufferSize, decodeBufferSize // size of buffers for precence values and values + bufB, bufV := make([]Value, decodeBufferSize), make([]Value, decodeBufferSize) // buffers for precence values and values + for { var value Value - present, err := presenceDec.Decode() - if errors.Is(err, io.EOF) { - return nil - } else if err != nil { - return fmt.Errorf("decoding presence bitmap: %w", err) - } else if present.Type() != datasetmd.VALUE_TYPE_UINT64 { - return fmt.Errorf("unexpected presence type %s", present.Type()) + // There are not decoded presence values in the buffer, so read a new + // batch up to decodeBufferSize values and reset the current index of the + // value inside the buffer. + if iB >= nB-1 { + nB, err = presenceDec.Decode(bufB[:decodeBufferSize]) + bufB = bufB[:nB] + iB = 0 + if nB == 0 || errors.Is(err, io.EOF) { + return nil + } else if err != nil { + return fmt.Errorf("decoding presence bitmap: %w", err) + } + } else { + iB++ } - // value is currently nil. If the presence bitmap says our row has a - // value, we decode it into value. - if present.Uint64() == 1 { - value, err = valuesDec.Decode() - if err != nil { - return fmt.Errorf("decoding value: %w", err) + if bufB[iB].Uint64() == 0 { //nolint:revive + // If the presence bitmap says our row has no value, we need to yield a + // nil value, so do nothing. + } else if bufB[iB].Uint64() == 1 { + // If the presence bitmap says our row has a value, we decode it. + + // There are no decoded values in the buffer, so read a new batch up to + // decodeBufferSize values and reset the current index of the value inside the buffer. + if iV >= nV-1 { + nV, err = valuesDec.Decode(bufV[:decodeBufferSize]) + bufV = bufV[:nV] + iV = 0 + if nV == 0 || errors.Is(err, io.EOF) { + return fmt.Errorf("too few values to decode: expected at least %d, got %d", len(bufB), nV) + } else if err != nil { + return fmt.Errorf("decoding value: %w", err) + } + } else { + iV++ } + value = bufV[iV] } if !yield(value) { diff --git a/pkg/dataobj/internal/dataset/page_test.go b/pkg/dataobj/internal/dataset/page_test.go index 869ace87bd..fbdfd27b85 100644 --- a/pkg/dataobj/internal/dataset/page_test.go +++ b/pkg/dataobj/internal/dataset/page_test.go @@ -11,6 +11,54 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" ) +func Benchmark_page_Decode(b *testing.B) { + in := []string{ + "hello, world!", + "", + "this is a test of the emergency broadcast system", + "this is only a test", + "if this were a real emergency, you would be instructed to panic", + "but it's not, so don't", + "", + "this concludes the test", + "thank you for your cooperation", + "goodbye", + } + + opts := BuilderOptions{ + PageSizeHint: 1 << 30, // 1GiB + Value: datasetmd.VALUE_TYPE_STRING, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + } + builder, err := newPageBuilder(opts) + require.NoError(b, err) + + for i := range 1_000_000 { + s := in[i%len(in)] + require.True(b, builder.Append(StringValue(s))) + } + + page, err := builder.Flush() + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + _, values, err := page.reader(datasetmd.COMPRESSION_TYPE_ZSTD) + if err != nil { + b.Fatal() + } + if _, err := io.Copy(io.Discard, values); err != nil { + b.Fatal(err) + } else if err := values.Close(); err != nil { + b.Fatal(err) + } + } + +} + func Benchmark_pageBuilder_WriteRead(b *testing.B) { in := []string{ "hello, world!", diff --git a/pkg/dataobj/internal/dataset/value_encoding.go b/pkg/dataobj/internal/dataset/value_encoding.go index b861a0142f..ea4fe66e25 100644 --- a/pkg/dataobj/internal/dataset/value_encoding.go +++ b/pkg/dataobj/internal/dataset/value_encoding.go @@ -40,9 +40,10 @@ type valueDecoder interface { // EncodingType returns the encoding type used by the valueDecoder. EncodingType() datasetmd.EncodingType - // Decode decodes an individual [Value]. Decode returns an error if decoding - // fails. - Decode() (Value, error) + // Decode decodes up to len(s) values, storing the results into s. The + // number of decoded values is returned, followed by an error (if any). + // At the end of the stream, Decode returns 0, [io.EOF]. + Decode(s []Value) (int, error) // Reset discards any state and resets the valueDecoder to read from r. This // permits reusing a valueDecoder rather than allocating a new one. diff --git a/pkg/dataobj/internal/dataset/value_encoding_bitmap.go b/pkg/dataobj/internal/dataset/value_encoding_bitmap.go index b037c6b97e..51e3ffc1b5 100644 --- a/pkg/dataobj/internal/dataset/value_encoding_bitmap.go +++ b/pkg/dataobj/internal/dataset/value_encoding_bitmap.go @@ -1,6 +1,7 @@ package dataset import ( + "errors" "fmt" "io" "math/bits" @@ -500,8 +501,34 @@ func (dec *bitmapDecoder) EncodingType() datasetmd.EncodingType { return datasetmd.ENCODING_TYPE_BITMAP } -// Decode reads the next uint64 value from the stream. -func (dec *bitmapDecoder) Decode() (Value, error) { +// Decode decodes up to len(s) values, storing the results into s. The +// number of decoded values is returned, followed by an error (if any). +// At the end of the stream, Decode returns 0, [io.EOF]. +func (dec *bitmapDecoder) Decode(s []Value) (int, error) { + if len(s) == 0 { + return 0, nil + } + + var err error + var v Value + + for i := range s { + v, err = dec.decode() + if errors.Is(err, io.EOF) { + if i == 0 { + return 0, io.EOF + } + return i, nil + } else if err != nil { + return i, err + } + s[i] = v + } + return len(s), nil +} + +// decode reads the next uint64 value from the stream. +func (dec *bitmapDecoder) decode() (Value, error) { // See comment inside [bitmapDecoder] for the state machine details. NextState: diff --git a/pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go b/pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go index 34264b0ea4..535d5fe301 100644 --- a/pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go +++ b/pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go @@ -2,6 +2,8 @@ package dataset import ( "bytes" + "errors" + "io" "math" "math/rand" "testing" @@ -13,8 +15,9 @@ func Test_bitmap(t *testing.T) { var buf bytes.Buffer var ( - enc = newBitmapEncoder(&buf) - dec = newBitmapDecoder(&buf) + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + decBuf = make([]Value, batchSize) ) count := 1500 @@ -25,10 +28,15 @@ func Test_bitmap(t *testing.T) { t.Logf("Buffer size: %d", buf.Len()) - for range count { - v, err := dec.Decode() + for { + n, err := dec.Decode(decBuf[:batchSize]) + if errors.Is(err, io.EOF) { + break + } require.NoError(t, err) - require.Equal(t, uint64(1), v.Uint64()) + for _, v := range decBuf[:n] { + require.Equal(t, uint64(1), v.Uint64()) + } } } @@ -36,8 +44,9 @@ func Test_bitmap_bitpacking(t *testing.T) { var buf bytes.Buffer var ( - enc = newBitmapEncoder(&buf) - dec = newBitmapDecoder(&buf) + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + decBuf = make([]Value, batchSize) ) expect := []uint64{0, 1, 2, 3, 4, 5, 6, 7} @@ -47,10 +56,15 @@ func Test_bitmap_bitpacking(t *testing.T) { require.NoError(t, enc.Flush()) var actual []uint64 - for range len(expect) { - v, err := dec.Decode() + for { + n, err := dec.Decode(decBuf[:batchSize]) + if errors.Is(err, io.EOF) { + break + } require.NoError(t, err) - actual = append(actual, v.Uint64()) + for _, v := range decBuf[:n] { + actual = append(actual, v.Uint64()) + } } require.NoError(t, enc.Flush()) @@ -61,8 +75,9 @@ func Test_bitmap_bitpacking_partial(t *testing.T) { var buf bytes.Buffer var ( - enc = newBitmapEncoder(&buf) - dec = newBitmapDecoder(&buf) + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + decBuf = make([]Value, batchSize) ) expect := []uint64{0, 1, 2, 3, 4} @@ -72,10 +87,15 @@ func Test_bitmap_bitpacking_partial(t *testing.T) { require.NoError(t, enc.Flush()) var actual []uint64 - for range len(expect) { - v, err := dec.Decode() + for { + n, err := dec.Decode(decBuf[:batchSize]) + if errors.Is(err, io.EOF) { + break + } require.NoError(t, err) - actual = append(actual, v.Uint64()) + for _, v := range decBuf[:n] { + actual = append(actual, v.Uint64()) + } } require.Equal(t, expect, actual) @@ -99,8 +119,9 @@ func Fuzz_bitmap(f *testing.F) { var buf bytes.Buffer var ( - enc = newBitmapEncoder(&buf) - dec = newBitmapDecoder(&buf) + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + decBuf = make([]Value, batchSize) ) var numbers []uint64 @@ -117,10 +138,15 @@ func Fuzz_bitmap(f *testing.F) { require.NoError(t, enc.Flush()) var actual []uint64 - for i := 0; i < count; i++ { - v, err := dec.Decode() + for { + n, err := dec.Decode(decBuf[:batchSize]) + if errors.Is(err, io.EOF) { + break + } require.NoError(t, err) - actual = append(actual, v.Uint64()) + for _, v := range decBuf[:n] { + actual = append(actual, v.Uint64()) + } } require.Equal(t, numbers, actual) @@ -205,7 +231,7 @@ func benchmarkBitmapDecoder(b *testing.B, width int) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = dec.Decode() + _, _ = dec.decode() } }) @@ -224,7 +250,7 @@ func benchmarkBitmapDecoder(b *testing.B, width int) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = dec.Decode() + _, _ = dec.decode() } }) @@ -244,7 +270,7 @@ func benchmarkBitmapDecoder(b *testing.B, width int) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = dec.Decode() + _, _ = dec.decode() } }) } diff --git a/pkg/dataobj/internal/dataset/value_encoding_delta.go b/pkg/dataobj/internal/dataset/value_encoding_delta.go index 2028d2bbe6..fc823503bf 100644 --- a/pkg/dataobj/internal/dataset/value_encoding_delta.go +++ b/pkg/dataobj/internal/dataset/value_encoding_delta.go @@ -1,7 +1,9 @@ package dataset import ( + "errors" "fmt" + "io" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" @@ -92,8 +94,34 @@ func (dec *deltaDecoder) EncodingType() datasetmd.EncodingType { return datasetmd.ENCODING_TYPE_DELTA } -// Decode decodes the next value. -func (dec *deltaDecoder) Decode() (Value, error) { +// Decode decodes up to len(s) values, storing the results into s. The +// number of decoded values is returned, followed by an error (if any). +// At the end of the stream, Decode returns 0, [io.EOF]. +func (dec *deltaDecoder) Decode(s []Value) (int, error) { + if len(s) == 0 { + return 0, nil + } + + var err error + var v Value + + for i := range s { + v, err = dec.decode() + if errors.Is(err, io.EOF) { + if i == 0 { + return 0, io.EOF + } + return i, nil + } else if err != nil { + return i, err + } + s[i] = v + } + return len(s), nil +} + +// decode reads the next uint64 value from the stream. +func (dec *deltaDecoder) decode() (Value, error) { delta, err := streamio.ReadVarint(dec.r) if err != nil { return Int64Value(dec.prev), err diff --git a/pkg/dataobj/internal/dataset/value_encoding_delta_test.go b/pkg/dataobj/internal/dataset/value_encoding_delta_test.go index 0837dbfe26..8f84079ef8 100644 --- a/pkg/dataobj/internal/dataset/value_encoding_delta_test.go +++ b/pkg/dataobj/internal/dataset/value_encoding_delta_test.go @@ -2,6 +2,8 @@ package dataset import ( "bytes" + "errors" + "io" "math" "math/rand" "testing" @@ -22,8 +24,9 @@ func Test_delta(t *testing.T) { var buf bytes.Buffer var ( - enc = newDeltaEncoder(&buf) - dec = newDeltaDecoder(&buf) + enc = newDeltaEncoder(&buf) + dec = newDeltaDecoder(&buf) + decBuf = make([]Value, batchSize) ) for _, num := range numbers { @@ -31,10 +34,15 @@ func Test_delta(t *testing.T) { } var actual []int64 - for range len(numbers) { - v, err := dec.Decode() + for { + n, err := dec.Decode(decBuf[:batchSize]) + if errors.Is(err, io.EOF) { + break + } require.NoError(t, err) - actual = append(actual, v.Int64()) + for _, v := range decBuf[:n] { + actual = append(actual, v.Int64()) + } } require.Equal(t, numbers, actual) @@ -54,8 +62,9 @@ func Fuzz_delta(f *testing.F) { var buf bytes.Buffer var ( - enc = newDeltaEncoder(&buf) - dec = newDeltaDecoder(&buf) + enc = newDeltaEncoder(&buf) + dec = newDeltaDecoder(&buf) + decBuf = make([]Value, batchSize) ) var numbers []int64 @@ -66,10 +75,15 @@ func Fuzz_delta(f *testing.F) { } var actual []int64 - for i := 0; i < count; i++ { - v, err := dec.Decode() + for { + n, err := dec.Decode(decBuf[:batchSize]) + if errors.Is(err, io.EOF) { + break + } require.NoError(t, err) - actual = append(actual, v.Int64()) + for _, v := range decBuf[:n] { + actual = append(actual, v.Int64()) + } } require.Equal(t, numbers, actual) @@ -126,7 +140,7 @@ func Benchmark_deltaDecoder_Decode(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = dec.Decode() + _, _ = dec.decode() } }) @@ -148,7 +162,7 @@ func Benchmark_deltaDecoder_Decode(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = dec.Decode() + _, _ = dec.decode() } }) @@ -168,7 +182,7 @@ func Benchmark_deltaDecoder_Decode(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = dec.Decode() + _, _ = dec.decode() } }) } diff --git a/pkg/dataobj/internal/dataset/value_encoding_plain.go b/pkg/dataobj/internal/dataset/value_encoding_plain.go index 25f463b3bf..d6224afbc2 100644 --- a/pkg/dataobj/internal/dataset/value_encoding_plain.go +++ b/pkg/dataobj/internal/dataset/value_encoding_plain.go @@ -2,6 +2,7 @@ package dataset import ( "encoding/binary" + "errors" "fmt" "io" "unsafe" @@ -95,8 +96,34 @@ func (dec *plainDecoder) EncodingType() datasetmd.EncodingType { return datasetmd.ENCODING_TYPE_PLAIN } -// Decode decodes a string. -func (dec *plainDecoder) Decode() (Value, error) { +// Decode decodes up to len(s) values, storing the results into s. The +// number of decoded values is returned, followed by an error (if any). +// At the end of the stream, Decode returns 0, [io.EOF]. +func (dec *plainDecoder) Decode(s []Value) (int, error) { + if len(s) == 0 { + return 0, nil + } + + var err error + var v Value + + for i := range s { + v, err = dec.decode() + if errors.Is(err, io.EOF) { + if i == 0 { + return 0, io.EOF + } + return i, nil + } else if err != nil { + return i, err + } + s[i] = v + } + return len(s), nil +} + +// decode decodes a string. +func (dec *plainDecoder) decode() (Value, error) { sz, err := binary.ReadUvarint(dec.r) if err != nil { return StringValue(""), err diff --git a/pkg/dataobj/internal/dataset/value_encoding_plain_test.go b/pkg/dataobj/internal/dataset/value_encoding_plain_test.go index 8540e4f61d..595e0240a6 100644 --- a/pkg/dataobj/internal/dataset/value_encoding_plain_test.go +++ b/pkg/dataobj/internal/dataset/value_encoding_plain_test.go @@ -19,12 +19,15 @@ var testStrings = []string{ "baz", } +var batchSize = 64 + func Test_plainEncoder(t *testing.T) { var buf bytes.Buffer var ( - enc = newPlainEncoder(&buf) - dec = newPlainDecoder(&buf) + enc = newPlainEncoder(&buf) + dec = newPlainDecoder(&buf) + decBuf = make([]Value, batchSize) ) for _, v := range testStrings { @@ -34,13 +37,15 @@ func Test_plainEncoder(t *testing.T) { var out []string for { - str, err := dec.Decode() + n, err := dec.Decode(decBuf[:batchSize]) if errors.Is(err, io.EOF) { break } else if err != nil { t.Fatal(err) } - out = append(out, str.String()) + for _, v := range decBuf[:n] { + out = append(out, v.String()) + } } require.Equal(t, testStrings, out) @@ -50,8 +55,9 @@ func Test_plainEncoder_partialRead(t *testing.T) { var buf bytes.Buffer var ( - enc = newPlainEncoder(&buf) - dec = newPlainDecoder(&oneByteReader{&buf}) + enc = newPlainEncoder(&buf) + dec = newPlainDecoder(&oneByteReader{&buf}) + decBuf = make([]Value, batchSize) ) for _, v := range testStrings { @@ -61,13 +67,15 @@ func Test_plainEncoder_partialRead(t *testing.T) { var out []string for { - str, err := dec.Decode() + n, err := dec.Decode(decBuf[:batchSize]) if errors.Is(err, io.EOF) { break } else if err != nil { t.Fatal(err) } - out = append(out, str.String()) + for _, v := range decBuf[:n] { + out = append(out, v.String()) + } } require.Equal(t, testStrings, out) @@ -87,19 +95,20 @@ func Benchmark_plainDecoder_Decode(b *testing.B) { buf := bytes.NewBuffer(make([]byte, 0, 1024)) // Large enough to avoid reallocations. var ( - enc = newPlainEncoder(buf) - dec = newPlainDecoder(buf) + enc = newPlainEncoder(buf) + dec = newPlainDecoder(buf) + decBuf = make([]Value, batchSize) ) for _, v := range testStrings { require.NoError(b, enc.Encode(StringValue(v))) } + var err error b.ResetTimer() for i := 0; i < b.N; i++ { for { - var err error - _, err = dec.Decode() + _, err = dec.Decode(decBuf[:batchSize]) if errors.Is(err, io.EOF) { break } else if err != nil {