mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
357 lines
9.3 KiB
357 lines
9.3 KiB
|
12 months ago
|
package dataset
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
"io"
|
||
|
|
"testing"
|
||
|
|
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
|
||
|
|
)
|
||
|
|
|
||
|
|
type testPerson struct {
|
||
|
|
firstName string
|
||
|
|
middleName string // May be empty
|
||
|
|
lastName string
|
||
|
|
birthYear int64
|
||
|
|
}
|
||
|
|
|
||
|
|
var basicReaderTestData = []testPerson{
|
||
|
|
{"John", "Robert", "Smith", 1980},
|
||
|
|
{"Jane", "", "Doe", 1985},
|
||
|
|
{"Alice", "Marie", "Johnson", 1990},
|
||
|
|
{"Bob", "", "Williams", 1975},
|
||
|
|
{"Carol", "Lynn", "Brown", 1982},
|
||
|
|
{"David", "", "Miller", 1988},
|
||
|
|
{"Eve", "Grace", "Davis", 1979},
|
||
|
|
{"Frank", "", "Wilson", 1983},
|
||
|
|
{"Grace", "Elizabeth", "Taylor", 1987},
|
||
|
|
{"Henry", "", "Anderson", 1981},
|
||
|
|
}
|
||
|
|
|
||
|
|
func Test_basicReader_ReadAll(t *testing.T) {
|
||
|
|
columns := buildTestColumns(t)
|
||
|
|
require.Len(t, columns, 4)
|
||
|
|
|
||
|
2 weeks ago
|
br := newBasicRowReader(columns)
|
||
|
12 months ago
|
defer br.Close()
|
||
|
|
|
||
|
2 weeks ago
|
actualRows, err := readBasicRowReader(br, 3)
|
||
|
12 months ago
|
require.NoError(t, err)
|
||
|
|
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
|
||
|
|
}
|
||
|
|
|
||
|
|
func Test_basicReader_ReadFromOffset(t *testing.T) {
|
||
|
|
columns := buildTestColumns(t)
|
||
|
|
require.Len(t, columns, 4)
|
||
|
|
|
||
|
2 weeks ago
|
br := newBasicRowReader(columns)
|
||
|
12 months ago
|
defer br.Close()
|
||
|
|
|
||
|
|
// Seek to row 4
|
||
|
|
_, err := br.Seek(4, io.SeekStart)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
2 weeks ago
|
actualRows, err := readBasicRowReader(br, 3)
|
||
|
12 months ago
|
require.NoError(t, err)
|
||
|
|
require.Equal(t, basicReaderTestData[4:], convertToTestPersons(actualRows))
|
||
|
|
}
|
||
|
|
|
||
|
|
func Test_basicReader_SeekToStart(t *testing.T) {
|
||
|
|
columns := buildTestColumns(t)
|
||
|
|
require.Len(t, columns, 4)
|
||
|
|
|
||
|
2 weeks ago
|
br := newBasicRowReader(columns)
|
||
|
12 months ago
|
defer br.Close()
|
||
|
|
|
||
|
|
// First read everything
|
||
|
2 weeks ago
|
_, err := readBasicRowReader(br, 3)
|
||
|
12 months ago
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
// Seek back to start and read again
|
||
|
|
_, err = br.Seek(0, io.SeekStart)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
2 weeks ago
|
actualRows, err := readBasicRowReader(br, 3)
|
||
|
12 months ago
|
require.NoError(t, err)
|
||
|
|
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
|
||
|
|
}
|
||
|
|
|
||
|
|
func Test_basicReader_ReadColumns(t *testing.T) {
|
||
|
|
columns := buildTestColumns(t)
|
||
|
|
require.Len(t, columns, 4)
|
||
|
|
|
||
|
2 weeks ago
|
br := newBasicRowReader(columns)
|
||
|
12 months ago
|
defer br.Close()
|
||
|
|
|
||
|
|
// Read only birth_year and middle_name columns (indices 3 and 1)
|
||
|
|
subset := []Column{columns[3], columns[1]}
|
||
|
|
|
||
|
|
var (
|
||
|
|
batch = make([]Row, 3)
|
||
|
|
all []Row
|
||
|
|
)
|
||
|
|
|
||
|
|
for {
|
||
|
|
// Clear the batch before reading the next set of values. See
|
||
|
|
// implementation of readBasicReader for more details.
|
||
|
|
clear(batch)
|
||
|
|
|
||
|
|
n, err := br.ReadColumns(context.Background(), subset, batch)
|
||
|
|
for _, row := range batch[:n] {
|
||
|
|
// Verify that the row has space for all columns
|
||
|
|
require.Len(t, row.Values, 4)
|
||
|
|
|
||
|
|
// Verify that unread columns (first_name and last_name) are nil
|
||
|
|
require.True(t, row.Values[0].IsNil(), "first_name should be nil")
|
||
|
|
require.True(t, row.Values[2].IsNil(), "last_name should be nil")
|
||
|
|
|
||
|
|
// Verify that read columns match the test data
|
||
|
|
testPerson := basicReaderTestData[row.Index]
|
||
|
|
if testPerson.middleName != "" {
|
||
|
6 months ago
|
require.Equal(t, testPerson.middleName, string(row.Values[1].Binary()), "middle_name mismatch")
|
||
|
12 months ago
|
} else {
|
||
|
5 months ago
|
require.True(t, row.Values[1].IsZero(), "middle_name should be nil")
|
||
|
12 months ago
|
}
|
||
|
|
require.Equal(t, testPerson.birthYear, row.Values[3].Int64(), "birth_year mismatch")
|
||
|
|
|
||
|
|
all = append(all, row)
|
||
|
|
}
|
||
|
|
if errors.Is(err, io.EOF) {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
require.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
require.Len(t, all, len(basicReaderTestData))
|
||
|
|
}
|
||
|
|
|
||
|
|
func Test_basicReader_Fill(t *testing.T) {
|
||
|
|
columns := buildTestColumns(t)
|
||
|
|
require.Len(t, columns, 4)
|
||
|
|
|
||
|
2 weeks ago
|
br := newBasicRowReader(columns)
|
||
|
12 months ago
|
defer br.Close()
|
||
|
|
|
||
|
|
// Create rows with specific indices we want to fill
|
||
|
|
buf := []Row{
|
||
|
|
{Index: 1},
|
||
|
|
{Index: 3},
|
||
|
|
{Index: 5},
|
||
|
|
{Index: 7},
|
||
|
|
{Index: 9},
|
||
|
|
}
|
||
|
|
|
||
|
|
// Fill only the firstName column
|
||
|
|
n, err := br.Fill(context.Background(), []Column{columns[0]}, buf)
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.Equal(t, len(buf), n)
|
||
|
|
|
||
|
|
// Verify the filled values
|
||
|
|
for _, row := range buf {
|
||
|
|
// Check that only firstName is filled
|
||
|
|
require.False(t, row.Values[0].IsNil(), "firstName should not be nil")
|
||
|
|
require.True(t, row.Values[1].IsNil(), "middleName should be nil")
|
||
|
|
require.True(t, row.Values[2].IsNil(), "lastName should be nil")
|
||
|
|
require.True(t, row.Values[3].IsNil(), "birthYear should be nil")
|
||
|
|
|
||
|
|
// Verify the firstName value
|
||
|
|
expectedPerson := basicReaderTestData[row.Index]
|
||
|
6 months ago
|
require.Equal(t, expectedPerson.firstName, string(row.Values[0].Binary()),
|
||
|
12 months ago
|
"firstName mismatch at index %d", row.Index)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func Test_partitionRows(t *testing.T) {
|
||
|
|
tt := []struct {
|
||
|
|
name string
|
||
|
|
in []int
|
||
|
|
expect [][]int
|
||
|
|
}{
|
||
|
|
{
|
||
|
|
name: "empty",
|
||
|
|
in: nil,
|
||
|
|
expect: nil,
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "contiguous range",
|
||
|
|
in: []int{1, 2, 3, 4, 5},
|
||
|
|
expect: [][]int{{1, 2, 3, 4, 5}},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "split range",
|
||
|
|
in: []int{1, 2, 4, 5},
|
||
|
|
expect: [][]int{{1, 2}, {4, 5}},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "single rows",
|
||
|
|
in: []int{1, 3, 5, 7},
|
||
|
|
expect: [][]int{{1}, {3}, {5}, {7}},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, tc := range tt {
|
||
|
|
t.Run(tc.name, func(t *testing.T) {
|
||
|
|
rows := make([]Row, len(tc.in))
|
||
|
|
for i, idx := range tc.in {
|
||
|
|
rows[i].Index = idx
|
||
|
|
}
|
||
|
|
|
||
|
|
var actual [][]int
|
||
|
|
for part := range partitionRows(rows) {
|
||
|
|
var actualRows []int
|
||
|
|
for _, row := range part {
|
||
|
|
actualRows = append(actualRows, row.Index)
|
||
|
|
}
|
||
|
|
actual = append(actual, actualRows)
|
||
|
|
}
|
||
|
|
|
||
|
|
require.Equal(t, tc.expect, actual)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func Test_basicReader_Reset(t *testing.T) {
|
||
|
|
columns := buildTestColumns(t)
|
||
|
|
require.Len(t, columns, 4)
|
||
|
|
|
||
|
2 weeks ago
|
br := newBasicRowReader(columns)
|
||
|
12 months ago
|
defer br.Close()
|
||
|
|
|
||
|
|
// First read everything
|
||
|
2 weeks ago
|
_, err := readBasicRowReader(br, 3)
|
||
|
12 months ago
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
// Reset and read again
|
||
|
|
br.Reset(columns)
|
||
|
|
|
||
|
2 weeks ago
|
actualRows, err := readBasicRowReader(br, 3)
|
||
|
12 months ago
|
require.NoError(t, err)
|
||
|
|
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
|
||
|
|
}
|
||
|
|
|
||
|
|
// buildTestColumns creates a set of columns with test data.
|
||
|
|
func buildTestColumns(t *testing.T) []Column {
|
||
|
|
t.Helper()
|
||
|
|
|
||
|
|
_, cols := buildTestDataset(t)
|
||
|
|
return cols
|
||
|
|
}
|
||
|
|
|
||
|
|
// buildTestDataset creates a set of columns with test data.
|
||
|
|
func buildTestDataset(t *testing.T) (Dataset, []Column) {
|
||
|
|
t.Helper()
|
||
|
|
|
||
|
|
// Create builders for each column
|
||
|
|
firstNameBuilder := buildStringColumn(t, "first_name")
|
||
|
|
middleNameBuilder := buildStringColumn(t, "middle_name")
|
||
|
|
lastNameBuilder := buildStringColumn(t, "last_name")
|
||
|
|
birthYearBuilder := buildInt64Column(t, "birth_year")
|
||
|
|
|
||
|
|
// Add data to each column
|
||
|
|
for i, p := range basicReaderTestData {
|
||
|
6 months ago
|
require.NoError(t, firstNameBuilder.Append(i, BinaryValue([]byte(p.firstName))))
|
||
|
|
require.NoError(t, middleNameBuilder.Append(i, BinaryValue([]byte(p.middleName))))
|
||
|
|
require.NoError(t, lastNameBuilder.Append(i, BinaryValue([]byte(p.lastName))))
|
||
|
12 months ago
|
require.NoError(t, birthYearBuilder.Append(i, Int64Value(p.birthYear)))
|
||
|
|
}
|
||
|
|
|
||
|
|
// Flush all columns
|
||
|
|
firstName, err := firstNameBuilder.Flush()
|
||
|
|
require.NoError(t, err)
|
||
|
|
middleName, err := middleNameBuilder.Flush()
|
||
|
|
require.NoError(t, err)
|
||
|
|
lastName, err := lastNameBuilder.Flush()
|
||
|
|
require.NoError(t, err)
|
||
|
|
birthYear, err := birthYearBuilder.Flush()
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
dset := FromMemory([]*MemColumn{firstName, middleName, lastName, birthYear})
|
||
|
|
|
||
|
|
cols, err := result.Collect(dset.ListColumns(context.Background()))
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
return dset, cols
|
||
|
|
}
|
||
|
|
|
||
|
|
func buildStringColumn(t *testing.T, name string) *ColumnBuilder {
|
||
|
|
t.Helper()
|
||
|
|
|
||
|
|
builder, err := NewColumnBuilder(name, BuilderOptions{
|
||
|
|
PageSizeHint: 16, // Small page size to force multiple pages
|
||
|
6 months ago
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "string"},
|
||
|
12 months ago
|
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
|
||
|
|
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
|
||
|
|
|
||
|
|
Statistics: StatisticsOptions{StoreRangeStats: true},
|
||
|
|
})
|
||
|
|
require.NoError(t, err)
|
||
|
|
return builder
|
||
|
|
}
|
||
|
|
|
||
|
|
func buildInt64Column(t *testing.T, name string) *ColumnBuilder {
|
||
|
|
t.Helper()
|
||
|
|
|
||
|
|
builder, err := NewColumnBuilder(name, BuilderOptions{
|
||
|
|
PageSizeHint: 16, // Small page size to force multiple pages
|
||
|
6 months ago
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"},
|
||
|
12 months ago
|
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
|
||
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
||
|
|
|
||
|
|
Statistics: StatisticsOptions{StoreRangeStats: true},
|
||
|
|
})
|
||
|
|
require.NoError(t, err)
|
||
|
|
return builder
|
||
|
|
}
|
||
|
|
|
||
|
2 weeks ago
|
// readBasicRowReader reads all rows from a basicRowReader using the given batch size.
|
||
|
|
func readBasicRowReader(br *basicRowReader, batchSize int) ([]Row, error) {
|
||
|
12 months ago
|
var (
|
||
|
|
all []Row
|
||
|
|
|
||
|
|
batch = make([]Row, batchSize)
|
||
|
|
)
|
||
|
|
|
||
|
|
for {
|
||
|
|
// Clear the batch for each read; this is required to ensure that any
|
||
|
|
// memory inside Row and Value doesn't get reused.
|
||
|
|
//
|
||
|
|
// This requires any Row/Value provided by br.Read is owned by the caller
|
||
|
|
// and is not retained by the reader; if a test fails and appears to have
|
||
|
2 weeks ago
|
// memory reuse, it's likely because code in basicRowReader changed and
|
||
|
|
// broke ownership semantics.
|
||
|
12 months ago
|
clear(batch)
|
||
|
|
|
||
|
|
n, err := br.Read(context.Background(), batch)
|
||
|
|
all = append(all, batch[:n]...)
|
||
|
|
if errors.Is(err, io.EOF) {
|
||
|
|
return all, nil
|
||
|
|
} else if err != nil {
|
||
|
|
return all, err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// convertToTestPersons converts a slice of rows to test persons.
|
||
|
|
func convertToTestPersons(rows []Row) []testPerson {
|
||
|
|
out := make([]testPerson, 0, len(rows))
|
||
|
|
|
||
|
|
for _, row := range rows {
|
||
|
|
var p testPerson
|
||
|
|
|
||
|
6 months ago
|
p.firstName = string(row.Values[0].Binary())
|
||
|
12 months ago
|
if !row.Values[1].IsNil() {
|
||
|
6 months ago
|
p.middleName = string(row.Values[1].Binary())
|
||
|
12 months ago
|
}
|
||
|
6 months ago
|
p.lastName = string(row.Values[2].Binary())
|
||
|
12 months ago
|
p.birthYear = row.Values[3].Int64()
|
||
|
|
|
||
|
|
out = append(out, p)
|
||
|
|
}
|
||
|
|
|
||
|
|
return out
|
||
|
|
}
|