diff --git a/pkg/dataobj/internal/dataset/column_builder.go b/pkg/dataobj/internal/dataset/column_builder.go index 0dfbde0796..31b05849b7 100644 --- a/pkg/dataobj/internal/dataset/column_builder.go +++ b/pkg/dataobj/internal/dataset/column_builder.go @@ -102,12 +102,14 @@ func (cb *ColumnBuilder) Append(row int, value Value) error { return fmt.Errorf("row %d is older than current row %d", row, cb.rows) } - // We give two attempts to append the data to the buffer; if the buffer is + // We give three attempts to append the data to the buffer; if the buffer is // full, we cut a page and then append to the newly reset buffer. + // In case we also need to backfill, there is case where the backfill fills + // the second page, that needs to be flushed again. // - // The second iteration should never fail, as the buffer will always be empty + // The third iteration should never fail, as the buffer will always be empty // then. - for range 2 { + for range 3 { if cb.append(row, value) { cb.rows = row + 1 cb.statsBuilder.Append(value) diff --git a/pkg/dataobj/internal/dataset/column_builder_test.go b/pkg/dataobj/internal/dataset/column_builder_test.go new file mode 100644 index 0000000000..07367b141f --- /dev/null +++ b/pkg/dataobj/internal/dataset/column_builder_test.go @@ -0,0 +1,120 @@ +package dataset_test + +import ( + "crypto/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +func binaryValue(n int) dataset.Value { + b := make([]byte, n) + _, _ = rand.Read(b) + return dataset.BinaryValue(b) +} + +func TestColumnBuilder_Append(t *testing.T) { + tests := []struct { + name string + pageSizeHint int + pageMaxRowCount int + rowsBefore []dataset.Value + rowIndex int + value dataset.Value + wantErr error + expPages int + }{ + { + name: "row exceeds target size hint", + pageSizeHint: 512, + pageMaxRowCount: 0, + rowsBefore: []dataset.Value{binaryValue(128)}, + rowIndex: 1, + value: binaryValue(512), + wantErr: nil, + expPages: 2, + }, + { + name: "row exceeds max row count", + pageSizeHint: 0, + pageMaxRowCount: 2, + rowsBefore: []dataset.Value{binaryValue(128), binaryValue(128)}, + rowIndex: 2, + value: binaryValue(128), + wantErr: nil, + expPages: 2, + }, + { + name: "first not NULL row exceeds target size hint", + pageSizeHint: 2, + pageMaxRowCount: 0, + rowsBefore: nil, + rowIndex: 9, + value: binaryValue(1), + wantErr: nil, + expPages: 1, // only 1 page, because 9 NULL rows do not count anything towards estimated size and therefore no page is explicitly flushed + }, + { + name: "first not NULL row exceeds max row count", + pageSizeHint: 0, + pageMaxRowCount: 1, + rowsBefore: nil, + rowIndex: 9, + value: binaryValue(1024), + wantErr: nil, + expPages: 2, + }, + { + name: "backfilling exceeds max row count", + pageSizeHint: 0, + pageMaxRowCount: 5, + rowsBefore: []dataset.Value{binaryValue(128), binaryValue(128)}, + rowIndex: 16, + value: binaryValue(128), + wantErr: nil, + expPages: 2, + }, + { + name: "backfilling exactly matches max row count", + pageSizeHint: 0, + pageMaxRowCount: 5, + rowsBefore: []dataset.Value{binaryValue(128), binaryValue(128), binaryValue(128), binaryValue(128)}, + rowIndex: 9, + value: binaryValue(128), + wantErr: nil, + expPages: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + opts := dataset.BuilderOptions{ + PageSizeHint: tt.pageSizeHint, + PageMaxRowCount: tt.pageMaxRowCount, + Type: dataset.ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "data"}, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + } + + cb, err := dataset.NewColumnBuilder("test", opts) + require.NoError(t, err) + + for i, value := range tt.rowsBefore { + err = cb.Append(i, value) + require.NoError(t, err, "setup of exisiting rows failed") + } + + err = cb.Append(tt.rowIndex, tt.value) + if tt.wantErr != nil { + require.ErrorIs(t, err, tt.wantErr) + } + + col, err := cb.Flush() + require.NoError(t, err) + require.Equal(t, tt.expPages, len(col.Pages)) + }) + } +} diff --git a/pkg/dataobj/internal/dataset/page_builder.go b/pkg/dataobj/internal/dataset/page_builder.go index 9411b288b2..56200972cc 100644 --- a/pkg/dataobj/internal/dataset/page_builder.go +++ b/pkg/dataobj/internal/dataset/page_builder.go @@ -85,7 +85,9 @@ func newPageBuilder(opts BuilderOptions) (*pageBuilder, error) { // The function canAppend checks whether `n` values with a total value size of `valueSize` can be appended to the current page // based on the options [dataobj.BuilderOptions.PageMaxRowCount] and [dataobj.BuilderOptions.PageSizeHint]. func (b *pageBuilder) canAppend(n, valueSize int) bool { - if rows := b.Rows(); b.opts.PageMaxRowCount > 0 && rows > 0 && b.Rows()+n > b.opts.PageMaxRowCount { + // In case multiple NULL values are appended (when backfilling rows), exceeding the PageMaxRowCount must be possible, + // otherwise it would never allow appending even in the 2nd iteration when the previous page was already flushed. + if b.opts.PageMaxRowCount > 0 && n <= b.opts.PageMaxRowCount && b.Rows()+n > b.opts.PageMaxRowCount { return false } if sz := b.EstimatedSize(); b.opts.PageSizeHint > 0 && sz > 0 && sz+valueSize > b.opts.PageSizeHint {