mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
182 lines
5.5 KiB
182 lines
5.5 KiB
|
3 weeks ago
|
package indexpointers_test
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
"io"
|
||
|
|
"testing"
|
||
|
|
|
||
|
|
"github.com/apache/arrow-go/v18/arrow"
|
||
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
||
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
||
|
|
"github.com/apache/arrow-go/v18/arrow/scalar"
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
|
||
|
|
"github.com/grafana/loki/v3/pkg/util/arrowtest"
|
||
|
|
)
|
||
|
|
|
||
|
|
// TestReader does a basic end-to-end test over a reader with a predicate applied.
|
||
|
|
func TestReaderWithoutPredicate(t *testing.T) {
|
||
|
|
sec := buildSection(t, []indexpointers.IndexPointer{
|
||
|
|
{Path: "path1", StartTs: unixTime(10), EndTs: unixTime(20)},
|
||
|
|
{Path: "path2", StartTs: unixTime(30), EndTs: unixTime(40)},
|
||
|
|
{Path: "path3", StartTs: unixTime(50), EndTs: unixTime(60)},
|
||
|
|
})
|
||
|
|
|
||
|
|
var (
|
||
|
|
pathCol = sec.Columns()[0]
|
||
|
|
minTimestampCol = sec.Columns()[1]
|
||
|
|
maxTimestampCol = sec.Columns()[2]
|
||
|
|
)
|
||
|
|
|
||
|
|
require.Equal(t, "path", pathCol.Name)
|
||
|
|
require.Equal(t, indexpointers.ColumnTypePath, pathCol.Type)
|
||
|
|
require.Equal(t, "min_timestamp", minTimestampCol.Name)
|
||
|
|
require.Equal(t, indexpointers.ColumnTypeMinTimestamp, minTimestampCol.Type)
|
||
|
|
require.Equal(t, "max_timestamp", maxTimestampCol.Name)
|
||
|
|
require.Equal(t, indexpointers.ColumnTypeMaxTimestamp, maxTimestampCol.Type)
|
||
|
|
|
||
|
|
for _, tt := range []struct {
|
||
|
|
name string
|
||
|
|
columns []*indexpointers.Column
|
||
|
|
expected arrowtest.Rows
|
||
|
|
}{
|
||
|
|
{
|
||
|
|
name: "basic reads with selected columns",
|
||
|
|
columns: []*indexpointers.Column{pathCol},
|
||
|
|
expected: arrowtest.Rows{
|
||
|
|
{"path.path.utf8": "path1"},
|
||
|
|
{"path.path.utf8": "path2"},
|
||
|
|
{"path.path.utf8": "path3"},
|
||
|
|
},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "basic reads with all columns",
|
||
|
|
columns: []*indexpointers.Column{pathCol, minTimestampCol, maxTimestampCol},
|
||
|
|
expected: arrowtest.Rows{
|
||
|
|
{"path.path.utf8": "path1", "min_timestamp.min_timestamp.timestamp": unixTime(10).UTC(), "max_timestamp.max_timestamp.timestamp": unixTime(20).UTC()},
|
||
|
|
{"path.path.utf8": "path2", "min_timestamp.min_timestamp.timestamp": unixTime(30).UTC(), "max_timestamp.max_timestamp.timestamp": unixTime(40).UTC()},
|
||
|
|
{"path.path.utf8": "path3", "min_timestamp.min_timestamp.timestamp": unixTime(50).UTC(), "max_timestamp.max_timestamp.timestamp": unixTime(60).UTC()},
|
||
|
|
},
|
||
|
|
},
|
||
|
|
} {
|
||
|
|
t.Run(tt.name, func(t *testing.T) {
|
||
|
|
r := indexpointers.NewReader(indexpointers.ReaderOptions{
|
||
|
|
Columns: tt.columns,
|
||
|
|
Allocator: memory.DefaultAllocator,
|
||
|
|
Predicates: nil,
|
||
|
|
})
|
||
|
|
|
||
|
|
actualTable, err := readTable(context.Background(), r)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
|
||
|
|
require.NoError(t, err, "failed to get rows from table")
|
||
|
|
require.Equal(t, tt.expected, actual)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestReaderWithTimestampPredicates tests reading with timestamp predicates.
|
||
|
|
func TestReaderWithTimestampPredicates(t *testing.T) {
|
||
|
|
var (
|
||
|
|
t10 = unixTime(10)
|
||
|
|
t20 = unixTime(20)
|
||
|
|
t25 = unixTime(25)
|
||
|
|
t25s = scalar.NewTimestampScalar(arrow.Timestamp(t25.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond})
|
||
|
|
t30 = unixTime(30)
|
||
|
|
t40 = unixTime(40)
|
||
|
|
t50 = unixTime(50)
|
||
|
|
t55 = unixTime(55)
|
||
|
|
t55s = scalar.NewTimestampScalar(arrow.Timestamp(t55.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond})
|
||
|
|
t60 = unixTime(60)
|
||
|
|
)
|
||
|
|
sec := buildSection(t, []indexpointers.IndexPointer{
|
||
|
|
{Path: "path1", StartTs: t10, EndTs: t20},
|
||
|
|
{Path: "path2", StartTs: t30, EndTs: t40},
|
||
|
|
{Path: "path3", StartTs: t50, EndTs: t60},
|
||
|
|
})
|
||
|
|
|
||
|
|
var (
|
||
|
|
pathCol = sec.Columns()[0]
|
||
|
|
minTimestampCol = sec.Columns()[1]
|
||
|
|
maxTimestampCol = sec.Columns()[2]
|
||
|
|
)
|
||
|
|
|
||
|
|
r := indexpointers.NewReader(indexpointers.ReaderOptions{
|
||
|
|
Columns: []*indexpointers.Column{pathCol, minTimestampCol, maxTimestampCol},
|
||
|
|
Allocator: memory.DefaultAllocator,
|
||
|
|
Predicates: []indexpointers.Predicate{
|
||
|
|
indexpointers.WhereTimeRangeOverlapsWith(minTimestampCol, maxTimestampCol, t25s, t55s),
|
||
|
|
},
|
||
|
|
})
|
||
|
|
|
||
|
|
actualTable, err := readTable(context.Background(), r)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
expected := arrowtest.Rows{
|
||
|
|
{
|
||
|
|
"path.path.utf8": "path2",
|
||
|
|
"min_timestamp.min_timestamp.timestamp": t30.UTC(),
|
||
|
|
"max_timestamp.max_timestamp.timestamp": t40.UTC(),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"path.path.utf8": "path3",
|
||
|
|
"min_timestamp.min_timestamp.timestamp": t50.UTC(),
|
||
|
|
"max_timestamp.max_timestamp.timestamp": t60.UTC(),
|
||
|
|
},
|
||
|
|
}
|
||
|
|
require.Equal(t, expected, actual)
|
||
|
|
}
|
||
|
|
|
||
|
|
func buildSection(t *testing.T, ptrData []indexpointers.IndexPointer) *indexpointers.Section {
|
||
|
|
t.Helper()
|
||
|
|
|
||
|
|
sectionBuilder := indexpointers.NewBuilder(nil, 0, 2)
|
||
|
|
|
||
|
|
for _, ptr := range ptrData {
|
||
|
|
sectionBuilder.Append(ptr.Path, ptr.StartTs, ptr.EndTs)
|
||
|
|
}
|
||
|
|
|
||
|
|
objectBuilder := dataobj.NewBuilder(nil)
|
||
|
|
require.NoError(t, objectBuilder.Append(sectionBuilder))
|
||
|
|
|
||
|
|
obj, closer, err := objectBuilder.Flush()
|
||
|
|
require.NoError(t, err)
|
||
|
|
t.Cleanup(func() { closer.Close() })
|
||
|
|
|
||
|
|
sec, err := indexpointers.Open(t.Context(), obj.Sections()[0])
|
||
|
|
require.NoError(t, err)
|
||
|
|
return sec
|
||
|
|
}
|
||
|
|
|
||
|
|
func readTable(ctx context.Context, r *indexpointers.Reader) (arrow.Table, error) {
|
||
|
|
var recs []arrow.RecordBatch
|
||
|
|
|
||
|
|
for {
|
||
|
|
rec, err := r.Read(ctx, 128)
|
||
|
|
if rec != nil {
|
||
|
|
if rec.NumRows() > 0 {
|
||
|
|
recs = append(recs, rec)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if err != nil && errors.Is(err, io.EOF) {
|
||
|
|
break
|
||
|
|
} else if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if len(recs) == 0 {
|
||
|
|
return nil, io.EOF
|
||
|
|
}
|
||
|
|
|
||
|
|
return array.NewTableFromRecords(recs[0].Schema(), recs), nil
|
||
|
|
}
|