chore(dataobj): Fix panic in column builder (#19153)

There is an edge case in the column builder where it panics when the AppendNulls() call tries to append more NULL values than the specified MaxPageRows setting when backfilling rows.
In that case the builder would panic because canAppend would always return false even when the previous page was flushed.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/19184/head
Christian Haudum 9 months ago committed by GitHub
parent f91b8b4052
commit 2ecf07d52c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      pkg/dataobj/internal/dataset/column_builder.go
  2. 120
      pkg/dataobj/internal/dataset/column_builder_test.go
  3. 4
      pkg/dataobj/internal/dataset/page_builder.go

@ -102,12 +102,14 @@ func (cb *ColumnBuilder) Append(row int, value Value) error {
return fmt.Errorf("row %d is older than current row %d", row, cb.rows)
}
// We give two attempts to append the data to the buffer; if the buffer is
// We give three attempts to append the data to the buffer; if the buffer is
// full, we cut a page and then append to the newly reset buffer.
// In case we also need to backfill, there is case where the backfill fills
// the second page, that needs to be flushed again.
//
// The second iteration should never fail, as the buffer will always be empty
// The third iteration should never fail, as the buffer will always be empty
// then.
for range 2 {
for range 3 {
if cb.append(row, value) {
cb.rows = row + 1
cb.statsBuilder.Append(value)

@ -0,0 +1,120 @@
package dataset_test
import (
"crypto/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)
func binaryValue(n int) dataset.Value {
b := make([]byte, n)
_, _ = rand.Read(b)
return dataset.BinaryValue(b)
}
func TestColumnBuilder_Append(t *testing.T) {
tests := []struct {
name string
pageSizeHint int
pageMaxRowCount int
rowsBefore []dataset.Value
rowIndex int
value dataset.Value
wantErr error
expPages int
}{
{
name: "row exceeds target size hint",
pageSizeHint: 512,
pageMaxRowCount: 0,
rowsBefore: []dataset.Value{binaryValue(128)},
rowIndex: 1,
value: binaryValue(512),
wantErr: nil,
expPages: 2,
},
{
name: "row exceeds max row count",
pageSizeHint: 0,
pageMaxRowCount: 2,
rowsBefore: []dataset.Value{binaryValue(128), binaryValue(128)},
rowIndex: 2,
value: binaryValue(128),
wantErr: nil,
expPages: 2,
},
{
name: "first not NULL row exceeds target size hint",
pageSizeHint: 2,
pageMaxRowCount: 0,
rowsBefore: nil,
rowIndex: 9,
value: binaryValue(1),
wantErr: nil,
expPages: 1, // only 1 page, because 9 NULL rows do not count anything towards estimated size and therefore no page is explicitly flushed
},
{
name: "first not NULL row exceeds max row count",
pageSizeHint: 0,
pageMaxRowCount: 1,
rowsBefore: nil,
rowIndex: 9,
value: binaryValue(1024),
wantErr: nil,
expPages: 2,
},
{
name: "backfilling exceeds max row count",
pageSizeHint: 0,
pageMaxRowCount: 5,
rowsBefore: []dataset.Value{binaryValue(128), binaryValue(128)},
rowIndex: 16,
value: binaryValue(128),
wantErr: nil,
expPages: 2,
},
{
name: "backfilling exactly matches max row count",
pageSizeHint: 0,
pageMaxRowCount: 5,
rowsBefore: []dataset.Value{binaryValue(128), binaryValue(128), binaryValue(128), binaryValue(128)},
rowIndex: 9,
value: binaryValue(128),
wantErr: nil,
expPages: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := dataset.BuilderOptions{
PageSizeHint: tt.pageSizeHint,
PageMaxRowCount: tt.pageMaxRowCount,
Type: dataset.ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "data"},
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
cb, err := dataset.NewColumnBuilder("test", opts)
require.NoError(t, err)
for i, value := range tt.rowsBefore {
err = cb.Append(i, value)
require.NoError(t, err, "setup of exisiting rows failed")
}
err = cb.Append(tt.rowIndex, tt.value)
if tt.wantErr != nil {
require.ErrorIs(t, err, tt.wantErr)
}
col, err := cb.Flush()
require.NoError(t, err)
require.Equal(t, tt.expPages, len(col.Pages))
})
}
}

@ -85,7 +85,9 @@ func newPageBuilder(opts BuilderOptions) (*pageBuilder, error) {
// The function canAppend checks whether `n` values with a total value size of `valueSize` can be appended to the current page
// based on the options [dataobj.BuilderOptions.PageMaxRowCount] and [dataobj.BuilderOptions.PageSizeHint].
func (b *pageBuilder) canAppend(n, valueSize int) bool {
if rows := b.Rows(); b.opts.PageMaxRowCount > 0 && rows > 0 && b.Rows()+n > b.opts.PageMaxRowCount {
// In case multiple NULL values are appended (when backfilling rows), exceeding the PageMaxRowCount must be possible,
// otherwise it would never allow appending even in the 2nd iteration when the previous page was already flushed.
if b.opts.PageMaxRowCount > 0 && n <= b.opts.PageMaxRowCount && b.Rows()+n > b.opts.PageMaxRowCount {
return false
}
if sz := b.EstimatedSize(); b.opts.PageSizeHint > 0 && sz > 0 && sz+valueSize > b.opts.PageSizeHint {

Loading…
Cancel
Save