Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/internal/dataset/reader_basic_test.go

356 lines
9.2 KiB

package dataset
import (
"context"
"errors"
"io"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
type testPerson struct {
firstName string
middleName string // May be empty
lastName string
birthYear int64
}
var basicReaderTestData = []testPerson{
{"John", "Robert", "Smith", 1980},
{"Jane", "", "Doe", 1985},
{"Alice", "Marie", "Johnson", 1990},
{"Bob", "", "Williams", 1975},
{"Carol", "Lynn", "Brown", 1982},
{"David", "", "Miller", 1988},
{"Eve", "Grace", "Davis", 1979},
{"Frank", "", "Wilson", 1983},
{"Grace", "Elizabeth", "Taylor", 1987},
{"Henry", "", "Anderson", 1981},
}
func Test_basicReader_ReadAll(t *testing.T) {
columns := buildTestColumns(t)
require.Len(t, columns, 4)
br := newBasicReader(columns)
defer br.Close()
actualRows, err := readBasicReader(br, 3)
require.NoError(t, err)
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
}
func Test_basicReader_ReadFromOffset(t *testing.T) {
columns := buildTestColumns(t)
require.Len(t, columns, 4)
br := newBasicReader(columns)
defer br.Close()
// Seek to row 4
_, err := br.Seek(4, io.SeekStart)
require.NoError(t, err)
actualRows, err := readBasicReader(br, 3)
require.NoError(t, err)
require.Equal(t, basicReaderTestData[4:], convertToTestPersons(actualRows))
}
func Test_basicReader_SeekToStart(t *testing.T) {
columns := buildTestColumns(t)
require.Len(t, columns, 4)
br := newBasicReader(columns)
defer br.Close()
// First read everything
_, err := readBasicReader(br, 3)
require.NoError(t, err)
// Seek back to start and read again
_, err = br.Seek(0, io.SeekStart)
require.NoError(t, err)
actualRows, err := readBasicReader(br, 3)
require.NoError(t, err)
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
}
func Test_basicReader_ReadColumns(t *testing.T) {
columns := buildTestColumns(t)
require.Len(t, columns, 4)
br := newBasicReader(columns)
defer br.Close()
// Read only birth_year and middle_name columns (indices 3 and 1)
subset := []Column{columns[3], columns[1]}
var (
batch = make([]Row, 3)
all []Row
)
for {
// Clear the batch before reading the next set of values. See
// implementation of readBasicReader for more details.
clear(batch)
n, err := br.ReadColumns(context.Background(), subset, batch)
for _, row := range batch[:n] {
// Verify that the row has space for all columns
require.Len(t, row.Values, 4)
// Verify that unread columns (first_name and last_name) are nil
require.True(t, row.Values[0].IsNil(), "first_name should be nil")
require.True(t, row.Values[2].IsNil(), "last_name should be nil")
// Verify that read columns match the test data
testPerson := basicReaderTestData[row.Index]
if testPerson.middleName != "" {
require.Equal(t, testPerson.middleName, string(row.Values[1].Binary()), "middle_name mismatch")
} else {
require.True(t, row.Values[1].IsZero(), "middle_name should be nil")
}
require.Equal(t, testPerson.birthYear, row.Values[3].Int64(), "birth_year mismatch")
all = append(all, row)
}
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.Len(t, all, len(basicReaderTestData))
}
func Test_basicReader_Fill(t *testing.T) {
columns := buildTestColumns(t)
require.Len(t, columns, 4)
br := newBasicReader(columns)
defer br.Close()
// Create rows with specific indices we want to fill
buf := []Row{
{Index: 1},
{Index: 3},
{Index: 5},
{Index: 7},
{Index: 9},
}
// Fill only the firstName column
n, err := br.Fill(context.Background(), []Column{columns[0]}, buf)
require.NoError(t, err)
require.Equal(t, len(buf), n)
// Verify the filled values
for _, row := range buf {
// Check that only firstName is filled
require.False(t, row.Values[0].IsNil(), "firstName should not be nil")
require.True(t, row.Values[1].IsNil(), "middleName should be nil")
require.True(t, row.Values[2].IsNil(), "lastName should be nil")
require.True(t, row.Values[3].IsNil(), "birthYear should be nil")
// Verify the firstName value
expectedPerson := basicReaderTestData[row.Index]
require.Equal(t, expectedPerson.firstName, string(row.Values[0].Binary()),
"firstName mismatch at index %d", row.Index)
}
}
func Test_partitionRows(t *testing.T) {
tt := []struct {
name string
in []int
expect [][]int
}{
{
name: "empty",
in: nil,
expect: nil,
},
{
name: "contiguous range",
in: []int{1, 2, 3, 4, 5},
expect: [][]int{{1, 2, 3, 4, 5}},
},
{
name: "split range",
in: []int{1, 2, 4, 5},
expect: [][]int{{1, 2}, {4, 5}},
},
{
name: "single rows",
in: []int{1, 3, 5, 7},
expect: [][]int{{1}, {3}, {5}, {7}},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
rows := make([]Row, len(tc.in))
for i, idx := range tc.in {
rows[i].Index = idx
}
var actual [][]int
for part := range partitionRows(rows) {
var actualRows []int
for _, row := range part {
actualRows = append(actualRows, row.Index)
}
actual = append(actual, actualRows)
}
require.Equal(t, tc.expect, actual)
})
}
}
func Test_basicReader_Reset(t *testing.T) {
columns := buildTestColumns(t)
require.Len(t, columns, 4)
br := newBasicReader(columns)
defer br.Close()
// First read everything
_, err := readBasicReader(br, 3)
require.NoError(t, err)
// Reset and read again
br.Reset(columns)
actualRows, err := readBasicReader(br, 3)
require.NoError(t, err)
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
}
// buildTestColumns creates a set of columns with test data.
func buildTestColumns(t *testing.T) []Column {
t.Helper()
_, cols := buildTestDataset(t)
return cols
}
// buildTestDataset creates a set of columns with test data.
func buildTestDataset(t *testing.T) (Dataset, []Column) {
t.Helper()
// Create builders for each column
firstNameBuilder := buildStringColumn(t, "first_name")
middleNameBuilder := buildStringColumn(t, "middle_name")
lastNameBuilder := buildStringColumn(t, "last_name")
birthYearBuilder := buildInt64Column(t, "birth_year")
// Add data to each column
for i, p := range basicReaderTestData {
require.NoError(t, firstNameBuilder.Append(i, BinaryValue([]byte(p.firstName))))
require.NoError(t, middleNameBuilder.Append(i, BinaryValue([]byte(p.middleName))))
require.NoError(t, lastNameBuilder.Append(i, BinaryValue([]byte(p.lastName))))
require.NoError(t, birthYearBuilder.Append(i, Int64Value(p.birthYear)))
}
// Flush all columns
firstName, err := firstNameBuilder.Flush()
require.NoError(t, err)
middleName, err := middleNameBuilder.Flush()
require.NoError(t, err)
lastName, err := lastNameBuilder.Flush()
require.NoError(t, err)
birthYear, err := birthYearBuilder.Flush()
require.NoError(t, err)
dset := FromMemory([]*MemColumn{firstName, middleName, lastName, birthYear})
cols, err := result.Collect(dset.ListColumns(context.Background()))
require.NoError(t, err)
return dset, cols
}
func buildStringColumn(t *testing.T, name string) *ColumnBuilder {
t.Helper()
builder, err := NewColumnBuilder(name, BuilderOptions{
PageSizeHint: 16, // Small page size to force multiple pages
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "string"},
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Statistics: StatisticsOptions{StoreRangeStats: true},
})
require.NoError(t, err)
return builder
}
func buildInt64Column(t *testing.T, name string) *ColumnBuilder {
t.Helper()
builder, err := NewColumnBuilder(name, BuilderOptions{
PageSizeHint: 16, // Small page size to force multiple pages
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"},
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Statistics: StatisticsOptions{StoreRangeStats: true},
})
require.NoError(t, err)
return builder
}
// readBasicReader reads all rows from a basicReader using the given batch size.
func readBasicReader(br *basicReader, batchSize int) ([]Row, error) {
var (
all []Row
batch = make([]Row, batchSize)
)
for {
// Clear the batch for each read; this is required to ensure that any
// memory inside Row and Value doesn't get reused.
//
// This requires any Row/Value provided by br.Read is owned by the caller
// and is not retained by the reader; if a test fails and appears to have
// memory reuse, it's likely because code in basicReader changed and broke
// ownership semantics.
clear(batch)
n, err := br.Read(context.Background(), batch)
all = append(all, batch[:n]...)
if errors.Is(err, io.EOF) {
return all, nil
} else if err != nil {
return all, err
}
}
}
// convertToTestPersons converts a slice of rows to test persons.
func convertToTestPersons(rows []Row) []testPerson {
out := make([]testPerson, 0, len(rows))
for _, row := range rows {
var p testPerson
p.firstName = string(row.Values[0].Binary())
if !row.Values[1].IsNil() {
p.middleName = string(row.Values[1].Binary())
}
p.lastName = string(row.Values[2].Binary())
p.birthYear = row.Values[3].Int64()
out = append(out, p)
}
return out
}