mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
240 lines
5.1 KiB
240 lines
5.1 KiB
|
4 weeks ago
|
package pointers_test
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
"io"
|
||
|
|
"slices"
|
||
|
|
"testing"
|
||
|
|
|
||
|
|
"github.com/apache/arrow-go/v18/arrow"
|
||
|
|
"github.com/apache/arrow-go/v18/arrow/scalar"
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
|
||
|
|
)
|
||
|
|
|
||
|
|
const maxStreamID = 200
|
||
|
|
|
||
|
|
// buildBenchSection creates a section with many pointers for benchmarking
|
||
|
|
func buildBenchSection(b *testing.B, numPointers int) *pointers.Section {
|
||
|
|
b.Helper()
|
||
|
|
|
||
|
|
sectionBuilder := pointers.NewBuilder(nil, 0, 2)
|
||
|
|
|
||
|
|
// Create diverse set of pointers with different stream IDs
|
||
|
|
for i := 0; i < numPointers; i++ {
|
||
|
|
streamID := int64(i % maxStreamID) // Cycle through different stream IDs
|
||
|
|
path := "path/to/object"
|
||
|
|
section := int64(i % 10)
|
||
|
|
startTs := unixTime(int64(i * 100))
|
||
|
|
endTs := unixTime(int64(i*100 + 50))
|
||
|
|
|
||
|
|
sectionBuilder.ObserveStream(path, section, streamID, streamID, startTs, 1024)
|
||
|
|
sectionBuilder.ObserveStream(path, section, streamID, streamID, endTs, 0)
|
||
|
|
}
|
||
|
|
|
||
|
|
objectBuilder := dataobj.NewBuilder(nil)
|
||
|
|
require.NoError(b, objectBuilder.Append(sectionBuilder))
|
||
|
|
|
||
|
|
obj, closer, err := objectBuilder.Flush()
|
||
|
|
require.NoError(b, err)
|
||
|
|
b.Cleanup(func() { closer.Close() })
|
||
|
|
|
||
|
|
sec, err := pointers.Open(context.Background(), obj.Sections()[0])
|
||
|
|
require.NoError(b, err)
|
||
|
|
return sec
|
||
|
|
}
|
||
|
|
|
||
|
|
type readerBenchParams struct {
|
||
|
|
name string
|
||
|
|
numPointers int
|
||
|
|
numStreamIDs int
|
||
|
|
}
|
||
|
|
|
||
|
|
func BenchmarkReaders(b *testing.B) {
|
||
|
|
benchmarks := []readerBenchParams{
|
||
|
|
{
|
||
|
|
name: "1k pointers, 10 stream IDs",
|
||
|
|
numPointers: 1000,
|
||
|
|
numStreamIDs: 10,
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "1k pointers, 200 stream IDs",
|
||
|
|
numPointers: 1000,
|
||
|
|
numStreamIDs: 200,
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "10k pointers, 200 stream IDs",
|
||
|
|
numPointers: 10000,
|
||
|
|
numStreamIDs: 200,
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "100k pointers, 200 stream IDs",
|
||
|
|
numPointers: 100000,
|
||
|
|
numStreamIDs: 200,
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, bm := range benchmarks {
|
||
|
|
b.Run(bm.name, func(b *testing.B) {
|
||
|
|
b.Run("RowReader", func(b *testing.B) {
|
||
|
|
benchmarkRowReader(b, bm)
|
||
|
|
})
|
||
|
|
b.Run("Reader", func(b *testing.B) {
|
||
|
|
benchmarkReader(b, bm)
|
||
|
|
})
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func benchmarkRowReader(b *testing.B, params readerBenchParams) {
|
||
|
|
ctx := context.Background()
|
||
|
|
sec := buildBenchSection(b, params.numPointers)
|
||
|
|
|
||
|
|
// Prepare stream IDs to match
|
||
|
|
streamIDs := make([]int64, params.numStreamIDs)
|
||
|
|
for i := 0; i < params.numStreamIDs; i++ {
|
||
|
|
streamIDs[i] = int64(i)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Prepare predicate if needed
|
||
|
|
predicate := pointers.TimeRangeRowPredicate{
|
||
|
|
Start: unixTime(0),
|
||
|
|
End: unixTime(100000),
|
||
|
|
}
|
||
|
|
|
||
|
|
b.ResetTimer()
|
||
|
|
b.ReportAllocs()
|
||
|
|
|
||
|
|
for range b.N {
|
||
|
|
reader := pointers.NewRowReader(sec)
|
||
|
|
err := reader.MatchStreams(slices.Values(streamIDs))
|
||
|
|
require.NoError(b, err)
|
||
|
|
|
||
|
|
err = reader.SetPredicate(predicate)
|
||
|
|
require.NoError(b, err)
|
||
|
|
|
||
|
|
buf := make([]pointers.SectionPointer, 128)
|
||
|
|
totalRead := 0
|
||
|
|
|
||
|
|
for {
|
||
|
|
n, err := reader.Read(ctx, buf)
|
||
|
|
totalRead += n
|
||
|
|
|
||
|
|
if err != nil {
|
||
|
|
if errors.Is(err, io.EOF) {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
require.NoError(b, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
require.NoError(b, reader.Close())
|
||
|
|
|
||
|
|
// Ensure we actually read something
|
||
|
|
if totalRead == 0 {
|
||
|
|
b.Fatal("read 0 pointers")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
b.StopTimer()
|
||
|
|
}
|
||
|
|
|
||
|
|
func benchmarkReader(b *testing.B, params readerBenchParams) {
|
||
|
|
ctx := context.Background()
|
||
|
|
sec := buildBenchSection(b, params.numPointers)
|
||
|
|
|
||
|
|
// Get all columns for reading
|
||
|
|
columns := sec.Columns()
|
||
|
|
|
||
|
|
// Prepare stream IDs to match
|
||
|
|
streamIDs := make([]scalar.Scalar, params.numStreamIDs)
|
||
|
|
for i := 0; i < params.numStreamIDs; i++ {
|
||
|
|
streamIDs[i] = scalar.NewInt64Scalar(int64(i))
|
||
|
|
}
|
||
|
|
|
||
|
|
var streamIDCol *pointers.Column
|
||
|
|
for _, col := range columns {
|
||
|
|
if col.Type == pointers.ColumnTypeStreamID {
|
||
|
|
streamIDCol = col
|
||
|
|
break
|
||
|
|
}
|
||
|
|
}
|
||
|
|
require.NotNil(b, streamIDCol)
|
||
|
|
|
||
|
|
// Build predicates
|
||
|
|
var predicates []pointers.Predicate
|
||
|
|
|
||
|
|
predicates = append(predicates, pointers.InPredicate{
|
||
|
|
Column: streamIDCol,
|
||
|
|
Values: streamIDs,
|
||
|
|
})
|
||
|
|
|
||
|
|
var minTsCol, maxTsCol *pointers.Column
|
||
|
|
for _, col := range columns {
|
||
|
|
switch col.Type {
|
||
|
|
case pointers.ColumnTypeMinTimestamp:
|
||
|
|
minTsCol = col
|
||
|
|
case pointers.ColumnTypeMaxTimestamp:
|
||
|
|
maxTsCol = col
|
||
|
|
}
|
||
|
|
}
|
||
|
|
require.NotNil(b, minTsCol)
|
||
|
|
require.NotNil(b, maxTsCol)
|
||
|
|
|
||
|
|
startScalar := scalar.NewTimestampScalar(arrow.Timestamp(0), arrow.FixedWidthTypes.Timestamp_ns)
|
||
|
|
endScalar := scalar.NewTimestampScalar(arrow.Timestamp(100000000000000), arrow.FixedWidthTypes.Timestamp_ns)
|
||
|
|
|
||
|
|
predicates = append(predicates, pointers.AndPredicate{
|
||
|
|
Left: pointers.GreaterThanPredicate{
|
||
|
|
Column: maxTsCol,
|
||
|
|
Value: startScalar,
|
||
|
|
},
|
||
|
|
Right: pointers.LessThanPredicate{
|
||
|
|
Column: minTsCol,
|
||
|
|
Value: endScalar,
|
||
|
|
},
|
||
|
|
})
|
||
|
|
|
||
|
|
b.ResetTimer()
|
||
|
|
b.ReportAllocs()
|
||
|
|
|
||
|
|
opts := pointers.ReaderOptions{
|
||
|
|
Columns: columns,
|
||
|
|
Predicates: predicates,
|
||
|
|
}
|
||
|
|
|
||
|
|
reader := pointers.NewReader(opts)
|
||
|
|
|
||
|
|
for range b.N {
|
||
|
|
reader.Reset(opts)
|
||
|
|
totalRows := int64(0)
|
||
|
|
|
||
|
|
for {
|
||
|
|
rec, err := reader.Read(ctx, 128)
|
||
|
|
if rec != nil {
|
||
|
|
totalRows += rec.NumRows()
|
||
|
|
rec.Release()
|
||
|
|
}
|
||
|
|
|
||
|
|
if err != nil {
|
||
|
|
if errors.Is(err, io.EOF) {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
require.NoError(b, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
require.NoError(b, reader.Close())
|
||
|
|
|
||
|
|
// Ensure we actually read something
|
||
|
|
if totalRows == 0 {
|
||
|
|
b.Fatal("read 0 rows")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
b.StopTimer()
|
||
|
|
}
|