diff --git a/pkg/dataobj/internal/dataset/page_reader.go b/pkg/dataobj/internal/dataset/page_reader.go index 340b9fe031..09412c5ec6 100644 --- a/pkg/dataobj/internal/dataset/page_reader.go +++ b/pkg/dataobj/internal/dataset/page_reader.go @@ -6,6 +6,7 @@ import ( "fmt" "io" + "github.com/grafana/loki/v3/pkg/columnar" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow" "github.com/grafana/loki/v3/pkg/memory" @@ -23,9 +24,7 @@ type pageReader struct { closer io.Closer presenceDec *bitmapDecoder - valuesDec legacyValueDecoder - - valuesBuf []Value + valuesDec valueDecoder pageRow int64 nextRow int64 @@ -86,14 +85,6 @@ func (pr *pageReader) read(v []Value) (n int, err error) { // need to find a new mechanism to prevent over-allocating. pr.alloc.Reclaim() - // We want to allow decoders to reuse memory of [Value]s in v while allowing - // the caller to retain ownership over that memory; to do this safely, we - // copy memory from v into pr.valuesBuf for our decoders to use. - // - // If we didn't do this, then memory backing [Value]s are owned by both - // pageReader and the caller, which can lead to memory reuse bugs. - pr.valuesBuf = reuseValuesBuffer(pr.valuesBuf, v) - // First read presence values for the next len(v) rows. bm := memory.MakeBitmap(&pr.alloc, len(v)) err = pr.presenceDec.DecodeTo(&bm, len(v)) @@ -115,30 +106,22 @@ func (pr *pageReader) read(v []Value) (n int, err error) { // need to read from the inner page. presentCount := bm.SetCount() + var values columnar.Array + // Now fill up to presentCount values of concrete values. - var valuesCount int if presentCount > 0 { - valuesCount, err = pr.valuesDec.Decode(pr.valuesBuf[:presentCount]) + values, err = pr.valuesDec.Decode(&pr.alloc, presentCount) if err != nil && !errors.Is(err, io.EOF) { return n, err - } else if valuesCount != presentCount { - return n, fmt.Errorf("unexpected number of values: %d, expected: %d", valuesCount, presentCount) + } else if values == nil { + return n, fmt.Errorf("unexpected nil values") + } else if values.Len() != presentCount { + return n, fmt.Errorf("unexpected number of values: %d, expected: %d", values.Len(), presentCount) } } - // Finally, copy over count values into v, setting NULL where appropriate and - // copying from pr.valuesBuf where appropriate. - var valuesIndex int - for i := range count { - if bm.Get(i) { - if valuesIndex >= valuesCount { - return n, fmt.Errorf("unexpected end of values") - } - v[i] = pr.valuesBuf[valuesIndex] - valuesIndex++ - } else { - v[i] = Value{} - } + if err := materializeSparseArray(v, bm, values); err != nil { + return n, err } n += count @@ -146,25 +129,6 @@ func (pr *pageReader) read(v []Value) (n int, err error) { return n, nil } -// reuseValuesBuffer prepares dst for reading up to len(src) values. Non-NULL -// values are appended to dst, with the remainder of the slice set to NULL. -// -// The resulting slice is len(src). -func reuseValuesBuffer(dst []Value, src []Value) []Value { - dst = slicegrow.GrowToCap(dst, len(src)) - dst = dst[:0] - - // We must maintain ordering against the caller slice here. - // Otherwise we can move pointers around which can get reused within a read call. - dst = append(dst, src...) - - filledLength := len(dst) - - dst = dst[:len(src)] - clear(dst[filledLength:]) - return dst -} - func (pr *pageReader) init(ctx context.Context) error { // Close any existing reader from a previous pageReader init. Even though // this also happens in [pageReader.Close], we want to do it here as well in @@ -193,7 +157,7 @@ func (pr *pageReader) init(ctx context.Context) error { if pr.valuesDec == nil || pr.lastPhysicalType != pr.physicalType || pr.lastEncoding != memPage.Desc.Encoding { var ok bool - pr.valuesDec, ok = newValueDecoder(&pr.alloc, pr.physicalType, memPage.Desc.Encoding, openedPage.ValueData) + pr.valuesDec, ok = newValueDecoder(pr.physicalType, memPage.Desc.Encoding, openedPage.ValueData) if !ok { return fmt.Errorf("unsupported value encoding %s/%s", pr.physicalType, memPage.Desc.Encoding) } @@ -209,6 +173,81 @@ func (pr *pageReader) init(ctx context.Context) error { return nil } +// materializeSparseArray materializes a dense array into a sparse [Value] slice +// based on a presence bitmap. +// +// len(dst) must be at least validity.Len(). +func materializeSparseArray(dst []Value, validity memory.Bitmap, denseValues columnar.Array) error { + if len(dst) < validity.Len() { + panic(fmt.Sprintf("invariant broken: dst len (%d) is less than validity len (%d)", len(dst), validity.Len())) + } + + switch arr := denseValues.(type) { + case *columnar.UTF8: + return materializeSparseUTF8(dst, validity, arr) + case *columnar.Int64: + return materializeSparseInt64(dst, validity, arr) + case nil: + return materializeNulls(dst, validity) + default: + panic(fmt.Sprintf("found unexpected type %T", arr)) + } +} + +func materializeSparseUTF8(dst []Value, validity memory.Bitmap, denseValues *columnar.UTF8) error { + var denseIndex int + + for i := range validity.Len() { + if !validity.Get(i) { + dst[i].Zero() + continue + } else if denseIndex >= denseValues.Len() { + return fmt.Errorf("unexpected end of values") + } + + srcBuf := denseValues.Get(denseIndex) + denseIndex++ + + dstBuf := slicegrow.GrowToCap(dst[i].Buffer(), len(srcBuf)) + dstBuf = dstBuf[:len(srcBuf)] + copy(dstBuf, srcBuf) + + dst[i] = BinaryValue(dstBuf) + } + + return nil +} + +func materializeSparseInt64(dst []Value, validity memory.Bitmap, denseValues *columnar.Int64) error { + srcValues := denseValues.Values() + + var srcIndex int + for i := range validity.Len() { + if !validity.Get(i) { + dst[i].Zero() + continue + } else if srcIndex >= len(srcValues) { + return fmt.Errorf("unexpected end of values") + } + + dst[i] = Int64Value(srcValues[srcIndex]) + srcIndex++ + } + + return nil +} + +func materializeNulls(dst []Value, validity memory.Bitmap) error { + if validity.SetCount() > 0 { + return fmt.Errorf("unexpected non-null values") + } + + for i := range validity.Len() { + dst[i].Zero() + } + return nil +} + // Seek sets the row offset for the next Read call, interpreted according to // whence: // diff --git a/pkg/dataobj/internal/dataset/value_encoding.go b/pkg/dataobj/internal/dataset/value_encoding.go index 74b2294559..c49e277b59 100644 --- a/pkg/dataobj/internal/dataset/value_encoding.go +++ b/pkg/dataobj/internal/dataset/value_encoding.go @@ -32,30 +32,8 @@ type valueEncoder interface { Reset(w streamio.Writer) } -// A legacyValueDecoder decodes sequences of [Value] from an underlying -// byte slice. Implementations of encoding types must call registerValueEncoding -// to register themselves. -type legacyValueDecoder interface { - // PhysicalType returns the type of values supported by the decoder. - PhysicalType() datasetmd.PhysicalType - - // EncodingType returns the encoding type used by the decoder. - EncodingType() datasetmd.EncodingType - - // Decode decodes up to len(s) values, storing the results into s. The - // number of decoded values is returned, followed by an error (if any). - // At the end of the stream, Decode returns 0, [io.EOF]. - Decode(s []Value) (int, error) - - // Reset discards any state and resets the decoder to read from data. - // This permits reusing a decoder rather than allocating a new one. - Reset(data []byte) -} - // A valueDecoder reads values from an underlying byte slice. Implementations of // encoding types must call registerValueEncoding to register themselves. -// -// valueDecoder supersedes [legacyValueDecoder]. type valueDecoder interface { // PhysicalType returns the type of values supported by the decoder. PhysicalType() datasetmd.PhysicalType @@ -84,9 +62,8 @@ type ( } registryEntry struct { - NewEncoder func(streamio.Writer) valueEncoder - NewDecoder func([]byte) valueDecoder - NewLegacyDecoder func([]byte) legacyValueDecoder + NewEncoder func(streamio.Writer) valueEncoder + NewDecoder func([]byte) valueDecoder } ) @@ -130,7 +107,7 @@ func newValueEncoder(physicalType datasetmd.PhysicalType, encodingType datasetmd // newValueDecoder creates a new decoder for the specified physicalType and // encodingType. If no encoding is registered for the specified combination of // physicalType and encodingType, newValueDecoder returns nil and false. -func newValueDecoder(alloc *memory.Allocator, physicalType datasetmd.PhysicalType, encodingType datasetmd.EncodingType, data []byte) (legacyValueDecoder, bool) { +func newValueDecoder(physicalType datasetmd.PhysicalType, encodingType datasetmd.EncodingType, data []byte) (valueDecoder, bool) { key := registryKey{ Physical: physicalType, Encoding: encodingType, @@ -142,15 +119,7 @@ func newValueDecoder(alloc *memory.Allocator, physicalType datasetmd.PhysicalTyp switch { case entry.NewDecoder != nil: - dec := &valueDecoderAdapter{ - Alloc: alloc, - Inner: entry.NewDecoder(data), - } - return dec, true - - case entry.NewLegacyDecoder != nil: - return entry.NewLegacyDecoder(data), true - + return entry.NewDecoder(data), true default: return nil, false } diff --git a/pkg/dataobj/internal/dataset/value_encoding_adapter.go b/pkg/dataobj/internal/dataset/value_encoding_adapter.go deleted file mode 100644 index 4a1d91544f..0000000000 --- a/pkg/dataobj/internal/dataset/value_encoding_adapter.go +++ /dev/null @@ -1,80 +0,0 @@ -package dataset - -import ( - "fmt" - - "github.com/grafana/loki/v3/pkg/columnar" - "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" - "github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow" - "github.com/grafana/loki/v3/pkg/memory" -) - -// valueDecoderAdapter implements [legacyValueDecoder] for a newer -// [valueDecoder] implementation. -type valueDecoderAdapter struct { - Alloc *memory.Allocator - Inner valueDecoder -} - -var _ legacyValueDecoder = (*valueDecoderAdapter)(nil) - -// PhysicalType returns the type of values supported by the decoder. -func (a *valueDecoderAdapter) PhysicalType() datasetmd.PhysicalType { return a.Inner.PhysicalType() } - -// EncodingType returns the encoding type used by the decoder. -func (a *valueDecoderAdapter) EncodingType() datasetmd.EncodingType { return a.Inner.EncodingType() } - -// Decode decodes up to len(s) values, storing the results into s. The -// number of decoded values is returned, followed by an error (if any). -// At the end of the stream, Decode returns 0, [io.EOF]. -func (a *valueDecoderAdapter) Decode(s []Value) (n int, err error) { - result, err := a.Inner.Decode(a.Alloc, len(s)) - if result != nil { - n = a.unpackArray(s, result) - } - return n, err -} - -func (a *valueDecoderAdapter) unpackArray(dst []Value, result columnar.Array) int { - switch result := result.(type) { - case *columnar.UTF8: - return a.unpackUTF8(dst, result) - case *columnar.Int64: - return a.unpackInt64(dst, result.Values()) - default: - panic(fmt.Sprintf("legacy decoder adapter found unexpected type %T", result)) - } -} - -func (a *valueDecoderAdapter) unpackUTF8(dst []Value, result *columnar.UTF8) int { - if result.Len() > len(dst) { - panic(fmt.Sprintf("invariant broken: larger src len (%d) than dst (%d)", result.Len(), len(dst))) - } - - for i := range result.Len() { - srcBuf := result.Get(i) - - dstBuf := slicegrow.GrowToCap(dst[i].Buffer(), len(srcBuf)) - dstBuf = dstBuf[:len(srcBuf)] - copy(dstBuf, srcBuf) - - dst[i] = BinaryValue(dstBuf) - } - - return result.Len() -} - -func (a *valueDecoderAdapter) unpackInt64(dst []Value, result []int64) int { - if len(result) > len(dst) { - panic(fmt.Sprintf("invariant broken: larger src len (%d) than dst (%d)", len(result), len(dst))) - } - - for i := range result { - dst[i] = Int64Value(result[i]) - } - return len(result) -} - -// Reset discards any state and resets the decoder to read from data. -// This permits reusing a decoder rather than allocating a new one. -func (a *valueDecoderAdapter) Reset(data []byte) { a.Inner.Reset(data) } diff --git a/pkg/dataobj/internal/dataset/value_encoding_plain_test.go b/pkg/dataobj/internal/dataset/value_encoding_plain_test.go index e8acbc9237..740c704258 100644 --- a/pkg/dataobj/internal/dataset/value_encoding_plain_test.go +++ b/pkg/dataobj/internal/dataset/value_encoding_plain_test.go @@ -58,37 +58,6 @@ func Test_plainBytesEncoder(t *testing.T) { require.Equal(t, testStrings, out) } -func Test_plainBytesDecoder_adapter(t *testing.T) { - var buf bytes.Buffer - - enc := newPlainBytesEncoder(&buf) - for _, v := range testStrings { - require.NoError(t, enc.Encode(BinaryValue([]byte(v)))) - } - - dec := valueDecoderAdapter{ - Alloc: new(memory.Allocator), - Inner: newPlainBytesDecoder(buf.Bytes()), - } - - var out []string - decBuf := make([]Value, batchSize) - for { - n, err := dec.Decode(decBuf[:batchSize]) - if n == 0 && errors.Is(err, io.EOF) { - break - } else if err != nil && !errors.Is(err, io.EOF) { - t.Fatal(err) - } - - for _, v := range decBuf[:n] { - out = append(out, string(v.Binary())) - } - } - - require.Equal(t, testStrings, out) -} - func Benchmark_plainBytesEncoder_Append(b *testing.B) { enc := newPlainBytesEncoder(streamio.Discard)