Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/pointers/reader_bench_test.go

239 lines
5.1 KiB

package pointers_test
import (
"context"
"errors"
"io"
"slices"
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
)
const maxStreamID = 200
// buildBenchSection creates a section with many pointers for benchmarking
func buildBenchSection(b *testing.B, numPointers int) *pointers.Section {
b.Helper()
sectionBuilder := pointers.NewBuilder(nil, 0, 2)
// Create diverse set of pointers with different stream IDs
for i := 0; i < numPointers; i++ {
streamID := int64(i % maxStreamID) // Cycle through different stream IDs
path := "path/to/object"
section := int64(i % 10)
startTs := unixTime(int64(i * 100))
endTs := unixTime(int64(i*100 + 50))
sectionBuilder.ObserveStream(path, section, streamID, streamID, startTs, 1024)
sectionBuilder.ObserveStream(path, section, streamID, streamID, endTs, 0)
}
objectBuilder := dataobj.NewBuilder(nil)
require.NoError(b, objectBuilder.Append(sectionBuilder))
obj, closer, err := objectBuilder.Flush()
require.NoError(b, err)
b.Cleanup(func() { closer.Close() })
sec, err := pointers.Open(context.Background(), obj.Sections()[0])
require.NoError(b, err)
return sec
}
type readerBenchParams struct {
name string
numPointers int
numStreamIDs int
}
func BenchmarkReaders(b *testing.B) {
benchmarks := []readerBenchParams{
{
name: "1k pointers, 10 stream IDs",
numPointers: 1000,
numStreamIDs: 10,
},
{
name: "1k pointers, 200 stream IDs",
numPointers: 1000,
numStreamIDs: 200,
},
{
name: "10k pointers, 200 stream IDs",
numPointers: 10000,
numStreamIDs: 200,
},
{
name: "100k pointers, 200 stream IDs",
numPointers: 100000,
numStreamIDs: 200,
},
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
b.Run("RowReader", func(b *testing.B) {
benchmarkRowReader(b, bm)
})
b.Run("Reader", func(b *testing.B) {
benchmarkReader(b, bm)
})
})
}
}
func benchmarkRowReader(b *testing.B, params readerBenchParams) {
ctx := context.Background()
sec := buildBenchSection(b, params.numPointers)
// Prepare stream IDs to match
streamIDs := make([]int64, params.numStreamIDs)
for i := 0; i < params.numStreamIDs; i++ {
streamIDs[i] = int64(i)
}
// Prepare predicate if needed
predicate := pointers.TimeRangeRowPredicate{
Start: unixTime(0),
End: unixTime(100000),
}
b.ResetTimer()
b.ReportAllocs()
for range b.N {
reader := pointers.NewRowReader(sec)
err := reader.MatchStreams(slices.Values(streamIDs))
require.NoError(b, err)
err = reader.SetPredicate(predicate)
require.NoError(b, err)
buf := make([]pointers.SectionPointer, 128)
totalRead := 0
for {
n, err := reader.Read(ctx, buf)
totalRead += n
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(b, err)
}
}
require.NoError(b, reader.Close())
// Ensure we actually read something
if totalRead == 0 {
b.Fatal("read 0 pointers")
}
}
b.StopTimer()
}
func benchmarkReader(b *testing.B, params readerBenchParams) {
ctx := context.Background()
sec := buildBenchSection(b, params.numPointers)
// Get all columns for reading
columns := sec.Columns()
// Prepare stream IDs to match
streamIDs := make([]scalar.Scalar, params.numStreamIDs)
for i := 0; i < params.numStreamIDs; i++ {
streamIDs[i] = scalar.NewInt64Scalar(int64(i))
}
var streamIDCol *pointers.Column
for _, col := range columns {
if col.Type == pointers.ColumnTypeStreamID {
streamIDCol = col
break
}
}
require.NotNil(b, streamIDCol)
// Build predicates
var predicates []pointers.Predicate
predicates = append(predicates, pointers.InPredicate{
Column: streamIDCol,
Values: streamIDs,
})
var minTsCol, maxTsCol *pointers.Column
for _, col := range columns {
switch col.Type {
case pointers.ColumnTypeMinTimestamp:
minTsCol = col
case pointers.ColumnTypeMaxTimestamp:
maxTsCol = col
}
}
require.NotNil(b, minTsCol)
require.NotNil(b, maxTsCol)
startScalar := scalar.NewTimestampScalar(arrow.Timestamp(0), arrow.FixedWidthTypes.Timestamp_ns)
endScalar := scalar.NewTimestampScalar(arrow.Timestamp(100000000000000), arrow.FixedWidthTypes.Timestamp_ns)
predicates = append(predicates, pointers.AndPredicate{
Left: pointers.GreaterThanPredicate{
Column: maxTsCol,
Value: startScalar,
},
Right: pointers.LessThanPredicate{
Column: minTsCol,
Value: endScalar,
},
})
b.ResetTimer()
b.ReportAllocs()
opts := pointers.ReaderOptions{
Columns: columns,
Predicates: predicates,
}
reader := pointers.NewReader(opts)
for range b.N {
reader.Reset(opts)
totalRows := int64(0)
for {
rec, err := reader.Read(ctx, 128)
if rec != nil {
totalRows += rec.NumRows()
rec.Release()
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(b, err)
}
}
require.NoError(b, reader.Close())
// Ensure we actually read something
if totalRows == 0 {
b.Fatal("read 0 rows")
}
}
b.StopTimer()
}