chore(dataobj): pass down line filters to logs reader (#16802)

pull/16847/head
Ashwanth 2 months ago committed by GitHub
parent f53bc96e1c
commit c51fa39684
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 20
      pkg/dataobj/logs_reader.go
  2. 40
      pkg/dataobj/predicate.go
  3. 105
      pkg/dataobj/querier/store.go
  4. 233
      pkg/dataobj/querier/store_test.go
  5. 2
      pkg/engine/planner/internal/tree/printer.go
  6. 10
      pkg/engine/planner/physical/expressions.go
  7. 4
      pkg/limits/ingest_limits_test.go

@ -296,6 +296,26 @@ func translateLogsPredicate(p LogsPredicate, columns []dataset.Column, columnDes
}
return convertLogsTimePredicate(p, timeColumn)
case LogMessageFilterPredicate:
messageColumn := findColumnFromDesc(columns, columnDesc, func(desc *logsmd.ColumnDesc) bool {
return desc.Type == logsmd.COLUMN_TYPE_MESSAGE
})
if messageColumn == nil {
return dataset.FalsePredicate{}
}
return dataset.FuncPredicate{
Column: messageColumn,
Keep: func(_ dataset.Column, value dataset.Value) bool {
if value.Type() == datasetmd.VALUE_TYPE_STRING {
// To handle older dataobjs that still use string type for message column. This can be removed in future.
return p.Keep([]byte(value.String()))
}
return p.Keep(value.ByteArray())
},
}
case MetadataMatcherPredicate:
metadataColumn := findColumnFromDesc(columns, columnDesc, func(desc *logsmd.ColumnDesc) bool {
return desc.Type == logsmd.COLUMN_TYPE_METADATA && desc.Info.Name == p.Key

@ -65,6 +65,12 @@ type (
Keep func(name, value string) bool
}
// A LogMessageFilterPredicate is a [LogsPredicate] that requires the log message
// of the entry to pass a Keep function.
LogMessageFilterPredicate struct {
Keep func(line []byte) bool
}
// A MetadataMatcherPredicate is a [LogsPredicate] that requires a metadata
// key named Key to exist with a value of Value.
MetadataMatcherPredicate struct{ Key, Value string }
@ -84,20 +90,22 @@ type (
}
)
func (AndPredicate[P]) isPredicate() {}
func (OrPredicate[P]) isPredicate() {}
func (NotPredicate[P]) isPredicate() {}
func (TimeRangePredicate[P]) isPredicate() {}
func (LabelMatcherPredicate) isPredicate() {}
func (LabelFilterPredicate) isPredicate() {}
func (MetadataMatcherPredicate) isPredicate() {}
func (MetadataFilterPredicate) isPredicate() {}
func (AndPredicate[P]) isPredicate() {}
func (OrPredicate[P]) isPredicate() {}
func (NotPredicate[P]) isPredicate() {}
func (TimeRangePredicate[P]) isPredicate() {}
func (LabelMatcherPredicate) isPredicate() {}
func (LabelFilterPredicate) isPredicate() {}
func (MetadataMatcherPredicate) isPredicate() {}
func (MetadataFilterPredicate) isPredicate() {}
func (LogMessageFilterPredicate) isPredicate() {}
func (AndPredicate[P]) predicateKind(P) {}
func (OrPredicate[P]) predicateKind(P) {}
func (NotPredicate[P]) predicateKind(P) {}
func (TimeRangePredicate[P]) predicateKind(P) {}
func (LabelMatcherPredicate) predicateKind(StreamsPredicate) {}
func (LabelFilterPredicate) predicateKind(StreamsPredicate) {}
func (MetadataMatcherPredicate) predicateKind(LogsPredicate) {}
func (MetadataFilterPredicate) predicateKind(LogsPredicate) {}
func (AndPredicate[P]) predicateKind(P) {}
func (OrPredicate[P]) predicateKind(P) {}
func (NotPredicate[P]) predicateKind(P) {}
func (TimeRangePredicate[P]) predicateKind(P) {}
func (LabelMatcherPredicate) predicateKind(StreamsPredicate) {}
func (LabelFilterPredicate) predicateKind(StreamsPredicate) {}
func (MetadataMatcherPredicate) predicateKind(LogsPredicate) {}
func (MetadataFilterPredicate) predicateKind(LogsPredicate) {}
func (LogMessageFilterPredicate) predicateKind(LogsPredicate) {}

@ -228,13 +228,22 @@ func selectLogs(ctx context.Context, objects []object, shard logql.Shard, req lo
}
}()
streamsPredicate := streamPredicate(selector.Matchers(), req.Start, req.End)
// TODO: support more predicates and combine with log.Pipeline.
logsPredicate := dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
var logsPredicate dataobj.LogsPredicate = dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
StartTime: req.Start,
EndTime: req.End,
IncludeStart: true,
IncludeEnd: false,
}
p, expr := buildLogsPredicateFromPipeline(selector)
if p != nil {
logsPredicate = dataobj.AndPredicate[dataobj.LogsPredicate]{
Left: logsPredicate,
Right: p,
}
}
req.Plan.AST = expr
g, ctx := errgroup.WithContext(ctx)
iterators := make([]iter.EntryIterator, len(shardedObjects))
@ -278,13 +287,22 @@ func selectSamples(ctx context.Context, objects []object, shard logql.Shard, exp
streamsPredicate := streamPredicate(selector.Matchers(), start, end)
// TODO: support more predicates and combine with log.Pipeline.
logsPredicate := dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
var logsPredicate dataobj.LogsPredicate = dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: false,
}
var predicateFromExpr dataobj.LogsPredicate
predicateFromExpr, expr = buildLogsPredicateFromSampleExpr(expr)
if predicateFromExpr != nil {
logsPredicate = dataobj.AndPredicate[dataobj.LogsPredicate]{
Left: logsPredicate,
Right: predicateFromExpr,
}
}
g, ctx := errgroup.WithContext(ctx)
iterators := make([]iter.SampleIterator, len(shardedObjects))
@ -632,3 +650,84 @@ func parseShards(shards []string) (logql.Shard, error) {
}
return parsed[0], nil
}
func buildLogsPredicateFromSampleExpr(expr syntax.SampleExpr) (dataobj.LogsPredicate, syntax.SampleExpr) {
var (
predicate dataobj.LogsPredicate
skip bool
)
expr.Walk(func(e syntax.Expr) {
switch e := e.(type) {
case *syntax.BinOpExpr:
// we might not encounter BinOpExpr at this point since the lhs and rhs are evaluated separately?
skip = true
return
case *syntax.RangeAggregationExpr:
if skip {
return
}
predicate, e.Left.Left = buildLogsPredicateFromPipeline(e.Left.Left)
}
})
return predicate, expr
}
func buildLogsPredicateFromPipeline(expr syntax.LogSelectorExpr) (dataobj.LogsPredicate, syntax.LogSelectorExpr) {
// Check if expr is a PipelineExpr, other implementations have no stages
pipelineExpr, ok := expr.(*syntax.PipelineExpr)
if !ok {
return nil, expr
}
var (
predicate dataobj.LogsPredicate
remainingStages = make([]syntax.StageExpr, 0, len(pipelineExpr.MultiStages))
appendPredicate = func(p dataobj.LogsPredicate) {
if predicate == nil {
predicate = p
} else {
predicate = dataobj.AndPredicate[dataobj.LogsPredicate]{
Left: predicate,
Right: p,
}
}
}
)
Outer:
for i, stage := range pipelineExpr.MultiStages {
switch s := stage.(type) {
case *syntax.LineFmtExpr:
// modifies the log line, break early as we cannot apply any more predicates
remainingStages = append(remainingStages, pipelineExpr.MultiStages[i:]...)
break Outer
case *syntax.LineFilterExpr:
// Convert the line filter to a predicate
f, err := s.Filter()
if err != nil {
remainingStages = append(remainingStages, s)
continue
}
// Create a line filter predicate
appendPredicate(dataobj.LogMessageFilterPredicate{
Keep: func(line []byte) bool {
return f.Filter(line)
},
})
default:
remainingStages = append(remainingStages, s)
}
}
if len(remainingStages) == 0 {
return predicate, pipelineExpr.Left // return MatchersExpr
}
pipelineExpr.MultiStages = remainingStages
return predicate, pipelineExpr
}

@ -163,6 +163,18 @@ func TestStore_SelectSamples(t *testing.T) {
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
},
},
{
name: "select all samples in range with multiple line filters",
selector: `rate({app=~".+"} != "bar" |~ "foo[3-6]"[1h])`,
start: now,
end: now.Add(time.Hour),
want: []sampleWithLabels{
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}},
},
},
}
for _, tt := range tests {
@ -315,6 +327,20 @@ func TestStore_SelectLogs(t *testing.T) {
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}},
},
},
{
name: "select with multiple line filters",
selector: `{app=~".+"} != "bar" |~ "foo[3-6]"`,
start: now,
end: now.Add(time.Hour),
limit: 100,
direction: logproto.FORWARD,
want: []entryWithLabels{
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}},
},
},
}
for _, tt := range tests {
@ -780,7 +806,7 @@ func TestShardSections(t *testing.T) {
// Verify each section is assigned exactly once
for metaIdx, meta := range tt.metadatas {
for section := 0; section < meta.LogsSections; section++ {
for section := range meta.LogsSections {
key := sectionKey{metaIdx: metaIdx, section: section}
count := sectionCounts[key]
require.Equal(t, 1, count, "section %d in metadata %d was assigned %d times", section, metaIdx, count)
@ -790,3 +816,208 @@ func TestShardSections(t *testing.T) {
})
}
}
func TestBuildLogsPredicateFromPipeline(t *testing.T) {
var evalPredicate func(p dataobj.Predicate, item []byte) bool
evalPredicate = func(p dataobj.Predicate, line []byte) bool {
switch p := p.(type) {
case dataobj.LogMessageFilterPredicate:
return p.Keep(line)
case dataobj.AndPredicate[dataobj.LogsPredicate]:
return evalPredicate(p.Left, line) && evalPredicate(p.Right, line)
default:
t.Fatalf("unsupported predicate type: %T", p)
return false
}
}
// helper function to test predicates against sample data
testPredicate := func(t *testing.T, pred dataobj.Predicate, testData [][]byte, expected []bool) {
t.Helper()
require.Equal(t, len(testData), len(expected), "test data and expected results must have the same length")
for i, line := range testData {
result := evalPredicate(pred, line)
require.Equal(t, expected[i], result, "predicate mismatch for line: %s", line)
}
}
// Create test data sets
testData := [][]byte{
[]byte("this is an error message"),
[]byte("this is a critical error"),
[]byte("this is a success message"),
[]byte("this is a warning message"),
}
for _, tt := range []struct {
name string
query syntax.LogSelectorExpr
expectedExpr syntax.LogSelectorExpr
testFunc func(t *testing.T, pred dataobj.Predicate)
}{
{
name: "single line match equal filter",
query: mustParseLogSelector(t, `{app="foo"} |= "error"`),
expectedExpr: mustParseLogSelector(t, `{app="foo"}`), // Line filter should be removed
testFunc: func(t *testing.T, pred dataobj.Predicate) {
require.NotNil(t, pred)
require.IsType(t, dataobj.LogMessageFilterPredicate{}, pred)
// Verify the predicate works correctly
expected := []bool{true, true, false, false}
testPredicate(t, pred, testData, expected)
},
},
{
name: "single line match not equal filter",
query: mustParseLogSelector(t, `{app="foo"} != "error"`),
expectedExpr: mustParseLogSelector(t, `{app="foo"}`), // Line filter should be removed
testFunc: func(t *testing.T, pred dataobj.Predicate) {
require.NotNil(t, pred)
require.IsType(t, dataobj.LogMessageFilterPredicate{}, pred)
// Verify the predicate works correctly
expected := []bool{false, false, true, true}
testPredicate(t, pred, testData, expected)
},
},
{
name: "multiple line filters",
query: mustParseLogSelector(t, `{app="foo"} |= "error" |= "critical"`),
expectedExpr: mustParseLogSelector(t, `{app="foo"}`), // Both line filters should be removed
testFunc: func(t *testing.T, pred dataobj.Predicate) {
require.NotNil(t, pred)
// we might expect AND predicate here, but the original expression
// only contains a single Filterer stage with chained OR filters
require.IsType(t, dataobj.LogMessageFilterPredicate{}, pred)
// The result should match logs containing both "error" and "critical"
expected := []bool{false, true, false, false}
testPredicate(t, pred, testData, expected)
},
},
{
name: "line filter stage after line_format",
query: mustParseLogSelector(t, `{app="foo"} | line_format "{{.message}}" |= "error"`),
expectedExpr: mustParseLogSelector(t, `{app="foo"} | line_format "{{.message}}" |= "error"`), // No filters should be removed
testFunc: func(t *testing.T, pred dataobj.Predicate) {
// Line filters after stages that mutate the line should be ignored
require.Nil(t, pred, "expected nil predicate for line filter after parser")
},
},
{
name: "no line filter",
query: mustParseLogSelector(t, `{app="foo"} | json | bar>100`),
expectedExpr: mustParseLogSelector(t, `{app="foo"} | json | bar>100`), // No filters to remove
testFunc: func(t *testing.T, pred dataobj.Predicate) {
require.Nil(t, pred, "expected nil predicate for query without line filter")
},
},
{
name: "line filter stage after label_fmt",
query: mustParseLogSelector(t, `{app="foo"} | label_format foo=bar |= "error"`),
expectedExpr: mustParseLogSelector(t, `{app="foo"} | label_format foo=bar`), // Line filter should be removed
testFunc: func(t *testing.T, pred dataobj.Predicate) {
require.NotNil(t, pred)
require.IsType(t, dataobj.LogMessageFilterPredicate{}, pred)
// Verify the predicate works correctly
expected := []bool{true, true, false, false}
testPredicate(t, pred, testData, expected)
},
},
{
name: "mixed filters with some removable",
query: mustParseLogSelector(t, `{app="foo"} |= "error" | json | status="critical" |= "critical"`),
expectedExpr: mustParseLogSelector(t, `{app="foo"} | json | status="critical"`),
testFunc: func(t *testing.T, pred dataobj.Predicate) {
require.NotNil(t, pred)
require.IsType(t, dataobj.AndPredicate[dataobj.LogsPredicate]{}, pred)
expected := []bool{false, true, false, false}
testPredicate(t, pred, testData, expected)
},
},
} {
t.Run(tt.name, func(t *testing.T) {
p, actualExpr := buildLogsPredicateFromPipeline(tt.query)
tt.testFunc(t, p)
// Verify the updated expression
syntax.AssertExpressions(t, tt.expectedExpr, actualExpr)
})
}
}
func TestBuildLogsPredicateFromSampleExpr(t *testing.T) {
for _, tt := range []struct {
name string
query syntax.SampleExpr
expectedExpr syntax.SampleExpr
testFunc func(t *testing.T, pred dataobj.Predicate)
}{
{
name: "range aggregation with line filter",
query: mustParseSampleExpr(t, `count_over_time({app="foo"} |= "error" [5m])`),
expectedExpr: mustParseSampleExpr(t, `count_over_time({app="foo"}[5m])`),
testFunc: func(t *testing.T, p dataobj.Predicate) {
require.NotNil(t, p)
require.IsType(t, dataobj.LogMessageFilterPredicate{}, p)
},
},
{
name: "vector aggregation with line filter",
query: mustParseSampleExpr(t, `sum by (app)(count_over_time({app="foo"} |= "error"[5m]))`),
expectedExpr: mustParseSampleExpr(t, `sum by (app)(count_over_time({app="foo"}[5m]))`),
testFunc: func(t *testing.T, p dataobj.Predicate) {
require.NotNil(t, p)
require.IsType(t, dataobj.LogMessageFilterPredicate{}, p)
},
},
{
name: "binary expressions are not modified",
query: mustParseSampleExpr(t, `(count_over_time({app="foo"} |= "error"[5m]) + count_over_time({app="bar"} |= "info"[5m]))`),
expectedExpr: mustParseSampleExpr(t, `(count_over_time({app="foo"} |= "error"[5m]) + count_over_time({app="bar"} |= "info"[5m]))`),
testFunc: func(t *testing.T, p dataobj.Predicate) {
require.Nil(t, p)
},
},
{
name: "label replace with line filter",
query: mustParseSampleExpr(t, `label_replace(rate({app="foo"} |= "error"[5m]), "new_label", "new_value", "old_label", "old_value")`),
expectedExpr: mustParseSampleExpr(t, `label_replace(rate({app="foo"}[5m]),"new_label","new_value","old_label","old_value")`),
testFunc: func(t *testing.T, p dataobj.Predicate) {
require.NotNil(t, p)
require.IsType(t, dataobj.LogMessageFilterPredicate{}, p)
},
},
} {
t.Run(tt.name, func(t *testing.T) {
p, actualExpr := buildLogsPredicateFromSampleExpr(tt.query)
tt.testFunc(t, p)
// Verify the updated expression
syntax.AssertExpressions(t, tt.expectedExpr, actualExpr)
})
}
}
// Helper function to parse log selectors for test cases
func mustParseLogSelector(t *testing.T, s string) syntax.LogSelectorExpr {
expr, err := syntax.ParseLogSelector(s, false)
if err != nil {
t.Fatalf("failed to parse log selector %q: %v", s, err)
}
return expr
}
// Helper function to parse sample expressions for test cases
func mustParseSampleExpr(t *testing.T, s string) syntax.SampleExpr {
expr, err := syntax.ParseSampleExpr(s)
if err != nil {
t.Fatalf("failed to parse sample expr %q: %v", s, err)
}
return expr
}

@ -52,7 +52,7 @@ type Node struct {
Children []*Node
// Comments, like Children, are child nodes of the node, with the difference
// that comments are indented a level deeper than children. A common use-case
// for comments are tree-style properies of a node, such as expressions of a
// for comments are tree-style properties of a node, such as expressions of a
// physical plan node.
Comments []*Node
}

@ -133,28 +133,28 @@ func (t BinaryOpType) String() string {
}
}
// Expression is the common interface for all expressions in a physcial plan.
// Expression is the common interface for all expressions in a physical plan.
type Expression interface {
Type() ExpressionType
isExpr()
}
// UnaryExpression is the common interface for all unary expressions in a
// physcial plan.
// physical plan.
type UnaryExpression interface {
Expression
isUnaryExpr()
}
// BinaryExpression is the common interface for all binary expressions in a
// physcial plan.
// physical plan.
type BinaryExpression interface {
Expression
isBinaryExpr()
}
// LiteralExpression is the common interface for all literal expressions in a
// physcial plan.
// physical plan.
type LiteralExpression interface {
Expression
ValueType() ValueType
@ -162,7 +162,7 @@ type LiteralExpression interface {
}
// ColumnExpression is the common interface for all column expressions in a
// physcial plan.
// physical plan.
type ColumnExpression interface {
Expression
isColumnExpr()

@ -994,9 +994,7 @@ func TestIngestLimits_evictOldStreams(t *testing.T) {
// Assign the Partition IDs.
partitions := make(map[string][]int32)
partitions["test"] = make([]int32, 0, len(tt.assignedPartitionIDs))
for _, partitionID := range tt.assignedPartitionIDs {
partitions["test"] = append(partitions["test"], partitionID)
}
partitions["test"] = append(partitions["test"], tt.assignedPartitionIDs...)
s.partitionManager.Assign(context.Background(), nil, partitions)
// Call evictOldStreams

Loading…
Cancel
Save