chore(dataobj): order predicates based on row selectivity and cost of reading (#17376)

pull/17522/head
Ashwanth 1 year ago committed by GitHub
parent 1b87ecf7a4
commit 009600cb51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      pkg/dataobj/internal/sections/logs/table.go
  2. 21
      pkg/dataobj/logs_reader.go
  3. 6
      pkg/dataobj/logs_reader_test.go
  4. 228
      pkg/dataobj/predicate_order.go
  5. 400
      pkg/dataobj/predicate_order_test.go
  6. 36
      pkg/dataobj/querier/store.go
  7. 3
      pkg/engine/compat.go
  8. 2
      pkg/engine/engine.go
  9. 3
      pkg/engine/engine_test.go

@ -121,7 +121,8 @@ func (b *tableBuffer) StreamID(pageSize int) *dataset.ColumnBuilder {
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Statistics: dataset.StatisticsOptions{
StoreRangeStats: true,
StoreRangeStats: true,
StoreCardinalityStats: true,
},
})
if err != nil {

@ -42,8 +42,8 @@ type LogsReader struct {
idx int
ready bool
matchIDs map[int64]struct{}
predicate LogsPredicate
matchIDs map[int64]struct{}
predicates []LogsPredicate
buf []dataset.Row
record logs.Record
@ -84,17 +84,17 @@ func (r *LogsReader) MatchStreams(ids iter.Seq[int64]) error {
return nil
}
// SetPredicate sets the predicate to use for filtering logs. [LogsReader.Read]
// SetPredicate sets the predicates to use for filtering logs. [LogsReader.Read]
// will only return logs for which the predicate passes.
//
// A predicate may only be set before reading begins or after a call to
// Predicates may only be set before reading begins or after a call to
// [LogsReader.Reset].
func (r *LogsReader) SetPredicate(p LogsPredicate) error {
func (r *LogsReader) SetPredicates(p []LogsPredicate) error {
if r.ready {
return fmt.Errorf("cannot change predicate after reading has started")
}
r.predicate = p
r.predicates = p
return nil
}
@ -181,8 +181,9 @@ func (r *LogsReader) initReader(ctx context.Context) error {
if p := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs); p != nil {
predicates = append(predicates, p)
}
if r.predicate != nil {
if p := translateLogsPredicate(r.predicate, columns, columnDescs); p != nil {
for _, predicate := range r.predicates {
if p := translateLogsPredicate(predicate, columns, columnDescs); p != nil {
predicates = append(predicates, p)
}
}
@ -190,7 +191,7 @@ func (r *LogsReader) initReader(ctx context.Context) error {
readerOpts := dataset.ReaderOptions{
Dataset: dset,
Columns: columns,
Predicates: predicates,
Predicates: OrderPredicates(predicates),
TargetCacheSize: 16_000_000, // Permit up to 16MB of cache pages.
}
@ -260,7 +261,7 @@ func (r *LogsReader) Reset(obj *Object, sectionIndex int) {
r.ready = false
clear(r.matchIDs)
r.predicate = nil
r.predicates = nil
r.columns = nil
r.columnDesc = nil

@ -114,7 +114,7 @@ func TestLogsReader_AddMetadataMatcher(t *testing.T) {
require.Equal(t, 1, md.LogsSections)
r := dataobj.NewLogsReader(obj, 0)
require.NoError(t, r.SetPredicate(dataobj.MetadataMatcherPredicate{"trace_id", "123"}))
require.NoError(t, r.SetPredicates([]dataobj.LogsPredicate{dataobj.MetadataMatcherPredicate{"trace_id", "123"}}))
actual, err := readAllRecords(context.Background(), r)
require.NoError(t, err)
@ -139,13 +139,13 @@ func TestLogsReader_AddMetadataFilter(t *testing.T) {
require.Equal(t, 1, md.LogsSections)
r := dataobj.NewLogsReader(obj, 0)
err = r.SetPredicate(dataobj.MetadataFilterPredicate{
err = r.SetPredicates([]dataobj.LogsPredicate{dataobj.MetadataFilterPredicate{
Key: "user",
Keep: func(key, value string) bool {
require.Equal(t, "user", key)
return strings.HasPrefix(value, "1")
},
})
}})
require.NoError(t, err)
actual, err := readAllRecords(context.Background(), r)

@ -0,0 +1,228 @@
package dataobj
import (
"sort"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
)
// SelectivityScore represents how selective a predicate is expected to be.
// Lower scores mean more selective (fewer matching rows).
type SelectivityScore float64
const (
noMatchSelectivity SelectivityScore = 0.0 // No match
matchAllSelectivity SelectivityScore = 1.0 // Matches all rows
)
// OrderPredicates orders the predicates based on their selectivity and cost as a simple heuristic.
// - Lower selectivity (more filtering) is better
// - Lower cost of processing a row is better
//
// TODO: a permutation-based approach would be more accurate as it would account
// for the cost of processing a predicate and how a predicate's selectivity affects
// the row count for subsequent predicates reducing their processing cost.
func OrderPredicates(predicates []dataset.Predicate) []dataset.Predicate {
if len(predicates) <= 1 {
return predicates
}
sort.Slice(predicates, func(i, j int) bool {
s1 := getPredicateSelectivity(predicates[i])
s2 := getPredicateSelectivity(predicates[j])
// for selectivity of 0, we can assume there is no cost to reading the column since the reader will prune the rows
// return early in such cases to avoid cost calculations
if s1 == 0 {
return true
} else if s2 == 0 {
return false
}
return float64(s1)*float64(getRowEvaluationCost(predicates[i])) < float64(s2)*float64(getRowEvaluationCost(predicates[j]))
})
return predicates
}
// getPredicateSelectivity returns a selectivity score representing the estimated percentage
// of rows that will match (0.0 to 1.0). Lower scores mean more selective (fewer matching rows).
func getPredicateSelectivity(p dataset.Predicate) SelectivityScore {
if p == nil {
return matchAllSelectivity
}
switch p := p.(type) {
case dataset.EqualPredicate:
info := p.Column.ColumnInfo()
if info.Statistics == nil || info.Statistics.CardinalityCount == 0 {
return getBaseSelectivity(p)
}
if info.Statistics.MinValue != nil && info.Statistics.MaxValue != nil {
var minValue, maxValue dataset.Value
if e1, e2 := minValue.UnmarshalBinary(info.Statistics.MinValue), maxValue.UnmarshalBinary(info.Statistics.MaxValue); e1 == nil && e2 == nil {
if dataset.CompareValues(p.Value, minValue) < 0 || dataset.CompareValues(p.Value, maxValue) > 0 {
// no rows will match
return noMatchSelectivity
}
}
}
// For equality, estimate rows per unique value
// selectivity = rows per unique value / total rows
matchingRows := float64(info.ValuesCount) / float64(info.Statistics.CardinalityCount)
return SelectivityScore(matchingRows / float64(info.RowsCount))
case dataset.InPredicate:
info := p.Column.ColumnInfo()
if info.Statistics == nil || info.Statistics.CardinalityCount == 0 {
return getBaseSelectivity(p)
}
valuesInRange := len(p.Values)
if info.Statistics.MinValue != nil && info.Statistics.MaxValue != nil {
var minValue, maxValue dataset.Value
if e1, e2 := minValue.UnmarshalBinary(info.Statistics.MinValue), maxValue.UnmarshalBinary(info.Statistics.MaxValue); e1 == nil && e2 == nil {
valuesInRange = 0
for _, v := range p.Values {
if dataset.CompareValues(v, minValue) >= 0 && dataset.CompareValues(v, maxValue) <= 0 {
valuesInRange++
}
}
if valuesInRange == 0 {
// no rows will match
return noMatchSelectivity
}
}
}
// Similar to equality but for multiple values
matchingRows := float64(info.ValuesCount) / float64(info.Statistics.CardinalityCount)
estimatedRows := matchingRows * float64(valuesInRange)
return SelectivityScore(min(estimatedRows/float64(info.RowsCount), 1.0))
case dataset.GreaterThanPredicate:
var (
info = p.Column.ColumnInfo()
minValue, maxValue dataset.Value
)
if info.Statistics == nil || info.Statistics.MinValue == nil || info.Statistics.MaxValue == nil {
return getBaseSelectivity(p)
}
if e1, e2 := minValue.UnmarshalBinary(info.Statistics.MinValue), maxValue.UnmarshalBinary(info.Statistics.MaxValue); e1 == nil && e2 == nil {
if dataset.CompareValues(p.Value, minValue) < 0 {
return SelectivityScore(1.0)
}
if dataset.CompareValues(p.Value, maxValue) > 0 {
return SelectivityScore(0.0)
}
// Estimate percentage of rows greater than the value assuming uniform distribution
position := float64(p.Value.Int64()-minValue.Int64()) /
float64(maxValue.Int64()-minValue.Int64())
return SelectivityScore(1.0 - position)
}
return getBaseSelectivity(p)
case dataset.LessThanPredicate:
var (
info = p.Column.ColumnInfo()
minValue, maxValue dataset.Value
)
if info.Statistics == nil || info.Statistics.MinValue == nil || info.Statistics.MaxValue == nil {
return getBaseSelectivity(p)
}
if e1, e2 := minValue.UnmarshalBinary(info.Statistics.MinValue), maxValue.UnmarshalBinary(info.Statistics.MaxValue); e1 == nil && e2 == nil {
if dataset.CompareValues(p.Value, minValue) < 0 {
return SelectivityScore(0.0)
}
if dataset.CompareValues(p.Value, maxValue) > 0 {
return SelectivityScore(1.0)
}
// Estimate percentage of rows less than the value assuming uniform distribution
position := float64(p.Value.Int64()-minValue.Int64()) /
float64(maxValue.Int64()-minValue.Int64())
return SelectivityScore(position)
}
return getBaseSelectivity(p)
case dataset.NotPredicate:
s := getPredicateSelectivity(p.Inner)
return SelectivityScore(1 - float64(s)) // Invert selectivity for NOT predicates
case dataset.AndPredicate:
// In some best case scenarios, we could multify selectivities and return s1 * s2
// However, predicates may be on the same or correlated columns,
// which would make multiplication an overestimate. We conservatively use min(s1, s2) instead.
return min(getPredicateSelectivity(p.Left), getPredicateSelectivity(p.Right))
case dataset.OrPredicate:
// Conservative estimate assuming there is no overlap between the returned rows
return SelectivityScore(min(getPredicateSelectivity(p.Left)+getPredicateSelectivity(p.Right), 1.0))
case dataset.FuncPredicate:
// For custom functions, we cannot use the stats to estimate selectivity or to prune the rows.
// We might want these evaluated towards the end.
return SelectivityScore(0.7)
default:
panic("unknown predicate type")
}
}
// getBaseSelectivity returns a conservative estimate when no stats are available
func getBaseSelectivity(p dataset.Predicate) SelectivityScore {
switch p.(type) {
// equality predicates are preferred
case dataset.EqualPredicate:
return SelectivityScore(0.1)
case dataset.InPredicate:
return SelectivityScore(0.3)
// range predicates are next
case dataset.GreaterThanPredicate, dataset.LessThanPredicate:
return SelectivityScore(0.5)
case dataset.FuncPredicate:
return SelectivityScore(0.7)
default:
return SelectivityScore(1.0)
}
}
// getRowEvaluationCost measures the cost of evaluating a row using the bytes that need to be processed.
func getRowEvaluationCost(p dataset.Predicate) int64 {
if p == nil {
return 0
}
// Use a map to track unique columns and their sizes
columnSizes := make(map[dataset.Column]int64)
dataset.WalkPredicate(p, func(p dataset.Predicate) bool {
switch p := p.(type) {
case dataset.EqualPredicate:
columnSizes[p.Column] = int64(p.Column.ColumnInfo().UncompressedSize)
case dataset.InPredicate:
columnSizes[p.Column] = int64(p.Column.ColumnInfo().UncompressedSize)
case dataset.GreaterThanPredicate:
columnSizes[p.Column] = int64(p.Column.ColumnInfo().UncompressedSize)
case dataset.LessThanPredicate:
columnSizes[p.Column] = int64(p.Column.ColumnInfo().UncompressedSize)
case dataset.FuncPredicate:
columnSizes[p.Column] = int64(p.Column.ColumnInfo().UncompressedSize)
}
return true
})
// Sum up the sizes of unique columns
totalSize := int64(0)
for _, size := range columnSizes {
totalSize += size
}
return totalSize
}

@ -0,0 +1,400 @@
package dataobj
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)
func TestGetPredicateSelectivity(t *testing.T) {
tests := []struct {
name string
predicate dataset.Predicate
want SelectivityScore
}{
{
name: "Equal on low cardinality column should be less selective",
predicate: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 10,
min: 0,
max: 100,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
want: SelectivityScore(0.1), // expects ~100 matches out of 1000 rows
},
{
name: "Equal on high cardinality column should be more selective",
predicate: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 1000,
min: 0,
max: 1000,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
want: SelectivityScore(0.001), // expects ~1 matches out of 1000 rows
},
{
name: "Equal with sparse data should have higher selectivity",
predicate: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 50,
cardinality: 10, // 5 matches out of 50 non-null values
min: 0,
max: 100,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
want: SelectivityScore(0.005),
},
{
name: "Equal with out of range value should have zero selectivity",
predicate: dataset.EqualPredicate{
Column: (&testColumn{
cardinality: 10,
min: 0,
max: 50,
}).ToMemColumn(t),
Value: dataset.Int64Value(100),
},
want: SelectivityScore(0.0),
},
{
name: "GreaterThan selectivity using min/max range",
predicate: dataset.GreaterThanPredicate{Column: (&testColumn{
min: 0,
max: 100,
}).ToMemColumn(t),
Value: dataset.Int64Value(70),
},
want: SelectivityScore(0.3),
},
{
name: "GreaterThan with operand greater than max value should have zero selectivity",
predicate: dataset.GreaterThanPredicate{Column: (&testColumn{
min: 0,
max: 50,
}).ToMemColumn(t),
Value: dataset.Int64Value(51),
},
want: SelectivityScore(0.0),
},
{
name: "GreaterThan with operand less than min value should match all rows",
predicate: dataset.GreaterThanPredicate{Column: (&testColumn{
min: 10,
max: 50,
}).ToMemColumn(t),
Value: dataset.Int64Value(0),
},
want: SelectivityScore(1.0),
},
{
name: "LessThan selectivity using min/max range",
predicate: dataset.LessThanPredicate{Column: (&testColumn{
min: 0,
max: 100,
}).ToMemColumn(t),
Value: dataset.Int64Value(70),
},
want: SelectivityScore(0.7),
},
{
name: "LessThan with operand less than min value should have zero selectivity",
predicate: dataset.LessThanPredicate{Column: (&testColumn{
min: 20,
max: 50,
}).ToMemColumn(t),
Value: dataset.Int64Value(10),
},
want: SelectivityScore(0.0),
},
{
name: "LessThan with operand greater than max value should match all rows",
predicate: dataset.LessThanPredicate{Column: (&testColumn{
min: 0,
max: 50,
}).ToMemColumn(t),
Value: dataset.Int64Value(60),
},
want: SelectivityScore(1.0),
},
{
name: "Not should invert the selectivity",
predicate: dataset.NotPredicate{
Inner: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 10, // 100 matches out of 1000 rows
min: 0,
max: 1000,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
},
want: SelectivityScore(0.9), // 1 - 0.1
},
{
name: "In should add up selectivity for valid values",
predicate: dataset.InPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 10, // 100 matches out of 1000 rows
min: 25,
max: 75,
}).ToMemColumn(t),
Values: []dataset.Value{dataset.Int64Value(20), dataset.Int64Value(50), dataset.Int64Value(60), dataset.Int64Value(80)}, // 2 values in range. ~200 matching rows
},
want: SelectivityScore(0.2), // 0.1 + 0.1
},
{
name: "And should take the minimum selectivity of the two predicates",
predicate: dataset.AndPredicate{
Left: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 10, // 100 matches out of 1000 rows
min: 0,
max: 1000,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
Right: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 20, // 50 matches out of 1000 rows
min: 0,
max: 1000,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
},
want: SelectivityScore(0.05),
},
{
name: "Or should add up selectivity for valid values",
predicate: dataset.OrPredicate{
Left: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 10, // 100 matches out of 1000 rows
min: 0,
max: 1000,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
Right: dataset.EqualPredicate{
Column: (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 20, // 50 matches out of 1000 rows
min: 0,
max: 1000,
}).ToMemColumn(t),
Value: dataset.Int64Value(50),
},
},
want: SelectivityScore(0.15),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getPredicateSelectivity(tt.predicate)
require.InDelta(t, float64(got), float64(tt.want), 1e-10)
})
}
}
func TestRowEvaluationCost(t *testing.T) {
uniqueColumn := (&testColumn{
size: 111,
}).ToMemColumn(t)
tests := []struct {
name string
predicate dataset.Predicate
want int64
}{
{
name: "Cost of a single column",
predicate: dataset.EqualPredicate{
Column: (&testColumn{
size: 100,
}).ToMemColumn(t),
},
want: 100,
},
{
name: "Cost of reading multiple columns",
predicate: dataset.AndPredicate{
Left: dataset.EqualPredicate{
Column: (&testColumn{
size: 100,
}).ToMemColumn(t),
},
Right: dataset.OrPredicate{
Left: dataset.EqualPredicate{
Column: (&testColumn{
size: 25,
}).ToMemColumn(t),
},
Right: dataset.EqualPredicate{
Column: (&testColumn{
size: 13,
}).ToMemColumn(t),
},
},
},
want: 100 + 25 + 13,
},
{
name: "Cost of reading multiple columns with one repeated column",
predicate: dataset.OrPredicate{
Left: dataset.EqualPredicate{
Column: (&testColumn{
size: 100,
}).ToMemColumn(t),
},
Right: dataset.AndPredicate{
Left: dataset.EqualPredicate{
Column: uniqueColumn,
},
Right: dataset.EqualPredicate{
Column: uniqueColumn,
},
},
},
want: 100 + 111,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getRowEvaluationCost(tt.predicate)
require.Equal(t, tt.want, got)
})
}
}
func TestOrderPredicates(t *testing.T) {
equalCol1 := (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 10,
size: 128,
min: 0,
max: 100,
}).ToMemColumn(t)
equalCol2 := (&testColumn{
rowCount: 1000,
valueCount: 1000,
cardinality: 10,
size: 1024,
min: 0,
max: 100,
}).ToMemColumn(t)
rangeCol := (&testColumn{
rowCount: 1000,
valueCount: 1000,
size: 128,
min: 0,
max: 100,
}).ToMemColumn(t)
// Pre-define all predicates
equalPred1 := dataset.EqualPredicate{Column: equalCol1, Value: dataset.Int64Value(50)} // selectivity 0.1, cost 128
equalPred2 := dataset.EqualPredicate{Column: equalCol2, Value: dataset.Int64Value(50)} // selectivity 0.1, cost 1024
rangePred := dataset.GreaterThanPredicate{Column: rangeCol, Value: dataset.Int64Value(50)} // selectivity 0.5, cost 128
zeroSelectPred := dataset.EqualPredicate{Column: equalCol1, Value: dataset.Int64Value(200)} // selectivity 0.0
andPred := dataset.AndPredicate{
Left: equalPred1,
Right: equalPred2,
} // selectivity 0.1, cost 1152
tests := []struct {
name string
predicates []dataset.Predicate
want []dataset.Predicate
}{
{
name: "Order by selectivity - equality before range",
predicates: []dataset.Predicate{rangePred, equalPred1},
want: []dataset.Predicate{equalPred1, rangePred},
},
{
name: "Order by cost when selectivity is similar",
predicates: []dataset.Predicate{equalPred2, equalPred1},
want: []dataset.Predicate{equalPred1, equalPred2},
},
{
name: "Zero selectivity predicates come first",
predicates: []dataset.Predicate{equalPred1, zeroSelectPred},
want: []dataset.Predicate{zeroSelectPred, equalPred1},
},
{
name: "Order by selectivity - equality before AND",
predicates: []dataset.Predicate{andPred, equalPred2},
want: []dataset.Predicate{equalPred2, andPred},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := OrderPredicates(tt.predicates)
require.Equal(t, tt.want, got)
})
}
}
func makeTestInt64Value(t *testing.T, v int64) []byte {
t.Helper()
b, err := dataset.Int64Value(v).MarshalBinary()
if err != nil {
t.Fatalf("failed to marshal int64 value %d: %v", v, err)
}
return b
}
type testColumn struct {
name string
rowCount int
valueCount int
cardinality int
size int
min int64
max int64
}
func (c *testColumn) ToMemColumn(t *testing.T) *dataset.MemColumn {
return &dataset.MemColumn{
Info: dataset.ColumnInfo{
Name: c.name,
RowsCount: c.rowCount,
ValuesCount: c.valueCount,
UncompressedSize: c.size,
Statistics: &datasetmd.Statistics{
CardinalityCount: uint64(c.cardinality),
MinValue: makeTestInt64Value(t, c.min),
MaxValue: makeTestInt64Value(t, c.max),
},
},
}
}

@ -228,19 +228,17 @@ func selectLogs(ctx context.Context, objects []object, shard logql.Shard, req lo
}
}()
streamsPredicate := streamPredicate(selector.Matchers(), req.Start, req.End)
var logsPredicate dataobj.LogsPredicate = dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
var logsPredicates []dataobj.LogsPredicate
logsPredicates = append(logsPredicates, dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
StartTime: req.Start,
EndTime: req.End,
IncludeStart: true,
IncludeEnd: false,
}
})
p, expr := buildLogsPredicateFromPipeline(selector)
if p != nil {
logsPredicate = dataobj.AndPredicate[dataobj.LogsPredicate]{
Left: logsPredicate,
Right: p,
}
logsPredicates = append(logsPredicates, p)
}
req.Plan.AST = expr
@ -254,7 +252,7 @@ func selectLogs(ctx context.Context, objects []object, shard logql.Shard, req lo
span.SetTag("object", obj.object.path)
span.SetTag("sections", len(obj.logReaders))
iterator, err := obj.selectLogs(ctx, streamsPredicate, logsPredicate, req)
iterator, err := obj.selectLogs(ctx, streamsPredicate, logsPredicates, req)
if err != nil {
return err
}
@ -287,20 +285,18 @@ func selectSamples(ctx context.Context, objects []object, shard logql.Shard, exp
streamsPredicate := streamPredicate(selector.Matchers(), start, end)
// TODO: support more predicates and combine with log.Pipeline.
var logsPredicate dataobj.LogsPredicate = dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
var logsPredicates []dataobj.LogsPredicate
logsPredicates = append(logsPredicates, dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: false,
}
})
var predicateFromExpr dataobj.LogsPredicate
predicateFromExpr, expr = buildLogsPredicateFromSampleExpr(expr)
if predicateFromExpr != nil {
logsPredicate = dataobj.AndPredicate[dataobj.LogsPredicate]{
Left: logsPredicate,
Right: predicateFromExpr,
}
logsPredicates = append(logsPredicates, predicateFromExpr)
}
g, ctx := errgroup.WithContext(ctx)
@ -313,7 +309,7 @@ func selectSamples(ctx context.Context, objects []object, shard logql.Shard, exp
span.SetTag("object", obj.object.path)
span.SetTag("sections", len(obj.logReaders))
iterator, err := obj.selectSamples(ctx, streamsPredicate, logsPredicate, expr)
iterator, err := obj.selectSamples(ctx, streamsPredicate, logsPredicates, expr)
if err != nil {
return err
}
@ -456,8 +452,8 @@ func (s *shardedObject) reset() {
clear(s.streams)
}
func (s *shardedObject) selectLogs(ctx context.Context, streamsPredicate dataobj.StreamsPredicate, logsPredicate dataobj.LogsPredicate, req logql.SelectLogParams) (iter.EntryIterator, error) {
if err := s.setPredicate(streamsPredicate, logsPredicate); err != nil {
func (s *shardedObject) selectLogs(ctx context.Context, streamsPredicate dataobj.StreamsPredicate, logsPredicates []dataobj.LogsPredicate, req logql.SelectLogParams) (iter.EntryIterator, error) {
if err := s.setPredicate(streamsPredicate, logsPredicates); err != nil {
return nil, err
}
@ -488,8 +484,8 @@ func (s *shardedObject) selectLogs(ctx context.Context, streamsPredicate dataobj
return iter.NewSortEntryIterator(iterators, req.Direction), nil
}
func (s *shardedObject) selectSamples(ctx context.Context, streamsPredicate dataobj.StreamsPredicate, logsPredicate dataobj.LogsPredicate, expr syntax.SampleExpr) (iter.SampleIterator, error) {
if err := s.setPredicate(streamsPredicate, logsPredicate); err != nil {
func (s *shardedObject) selectSamples(ctx context.Context, streamsPredicate dataobj.StreamsPredicate, logsPredicates []dataobj.LogsPredicate, expr syntax.SampleExpr) (iter.SampleIterator, error) {
if err := s.setPredicate(streamsPredicate, logsPredicates); err != nil {
return nil, err
}
@ -527,12 +523,12 @@ func (s *shardedObject) selectSamples(ctx context.Context, streamsPredicate data
return iter.NewSortSampleIterator(iterators), nil
}
func (s *shardedObject) setPredicate(streamsPredicate dataobj.StreamsPredicate, logsPredicate dataobj.LogsPredicate) error {
func (s *shardedObject) setPredicate(streamsPredicate dataobj.StreamsPredicate, logsPredicates []dataobj.LogsPredicate) error {
if err := s.streamReader.SetPredicate(streamsPredicate); err != nil {
return err
}
for _, reader := range s.logReaders {
if err := reader.SetPredicate(logsPredicate); err != nil {
if err := reader.SetPredicates(logsPredicates); err != nil {
return err
}
}

@ -3,10 +3,11 @@ package engine
import (
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/push"
)
func newResultBuilder() *resultBuilder {

@ -79,7 +79,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
e.metrics.logicalPlanning.Observe(time.Since(t).Seconds())
durLogicalPlanning := time.Since(t)
t = time.Now() // start stopwatch for physcial planning
t = time.Now() // start stopwatch for physical planning
executionContext := physical.NewContext(ctx, e.metastore, params.Start(), params.End())
planner := physical.NewPlanner(executionContext)
plan, err := planner.Build(logicalPlan)

@ -11,11 +11,12 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/engine/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/pkg/push"
)
func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arrow.Record {

Loading…
Cancel
Save