chore(dataobj): introduce InPredicate to replace long chain of OR operators (#16919)

pull/14624/merge
Ashwanth 9 months ago committed by GitHub
parent 0a3230f457
commit 42eed6ded5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      pkg/dataobj/internal/dataset/predicate.go
  2. 37
      pkg/dataobj/internal/dataset/reader.go
  3. 27
      pkg/dataobj/internal/dataset/reader_test.go
  4. 24
      pkg/dataobj/logs_reader.go

@ -29,6 +29,13 @@ type (
Value Value // Value to check equality for.
}
// An InPredicate is a [Predicate] which asserts that a row may only be
// included if the Value of the Column is present in the provided Values.
InPredicate struct {
Column Column // Column to check.
Values []Value // Values to check for inclusion.
}
// A GreaterThanPredicate is a [Predicate] which asserts that a row may only
// be included if the Value of the Column is greater than the provided Value.
GreaterThanPredicate struct {
@ -65,6 +72,7 @@ func (OrPredicate) isPredicate() {}
func (NotPredicate) isPredicate() {}
func (FalsePredicate) isPredicate() {}
func (EqualPredicate) isPredicate() {}
func (InPredicate) isPredicate() {}
func (GreaterThanPredicate) isPredicate() {}
func (LessThanPredicate) isPredicate() {}
func (FuncPredicate) isPredicate() {}
@ -92,6 +100,7 @@ func WalkPredicate(p Predicate, fn func(p Predicate) bool) {
case FalsePredicate: // No children.
case EqualPredicate: // No children.
case InPredicate: // No children.
case GreaterThanPredicate: // No children.
case LessThanPredicate: // No children.
case FuncPredicate: // No children.

@ -211,6 +211,22 @@ func checkPredicate(p Predicate, lookup map[Column]int, row Row) bool {
}
return CompareValues(row.Values[columnIndex], p.Value) == 0
case InPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
panic("checkPredicate: column not found")
}
found := false
for _, v := range p.Values {
if CompareValues(row.Values[columnIndex], v) == 0 {
found = true
break
}
}
return found
case GreaterThanPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
@ -354,6 +370,8 @@ func (r *Reader) validatePredicate() error {
switch p := p.(type) {
case EqualPredicate:
err = process(p.Column)
case InPredicate:
err = process(p.Column)
case GreaterThanPredicate:
err = process(p.Column)
case LessThanPredicate:
@ -426,6 +444,8 @@ func (r *Reader) fillPrimaryMask(mask *bitmask.Mask) {
switch p := p.(type) {
case EqualPredicate:
process(p.Column)
case InPredicate:
process(p.Column)
case GreaterThanPredicate:
process(p.Column)
case LessThanPredicate:
@ -495,6 +515,9 @@ func (r *Reader) buildPredicateRanges(ctx context.Context, p Predicate) (rowRang
case EqualPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
case InPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
case GreaterThanPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
@ -571,6 +594,10 @@ func simplifyNotPredicate(p NotPredicate) (Predicate, error) {
Right: GreaterThanPredicate(inner),
}, nil
case InPredicate:
// TODO: can be supported when we introduce NotInPredicate.
return nil, fmt.Errorf("can't simplify InPredicate")
case FuncPredicate:
return nil, fmt.Errorf("can't simplify FuncPredicate")
@ -580,7 +607,7 @@ func simplifyNotPredicate(p NotPredicate) (Predicate, error) {
}
// buildColumnPredicateRanges returns a set of rowRanges that are valid based
// on whether EqualPredicate, GreaterThanPredicate, or LessThanPredicate may be
// on whether EqualPredicate, InPredicate, GreaterThanPredicate, or LessThanPredicate may be
// true for each page in a column.
func (r *Reader) buildColumnPredicateRanges(ctx context.Context, c Column, p Predicate) (rowRanges, error) {
// Get the wrapped column so that the result of c.ListPages can be cached.
@ -630,6 +657,14 @@ func (r *Reader) buildColumnPredicateRanges(ctx context.Context, c Column, p Pre
include = CompareValues(maxValue, p.Value) > 0
case LessThanPredicate: // LessThanPredicate may be true if minValue of a page is less than p.Value
include = CompareValues(minValue, p.Value) < 0
case InPredicate:
// Check if any value falls within the page's range
for _, v := range p.Values {
if CompareValues(v, minValue) >= 0 && CompareValues(v, maxValue) <= 0 {
include = true
break
}
}
default:
panic(fmt.Sprintf("unsupported predicate type %T", p))
}

@ -280,6 +280,33 @@ func Test_BuildPredicateRanges(t *testing.T) {
},
want: rowRanges{{Start: 0, End: 299}, {Start: 750, End: 999}}, // Rows 0 - 299, 750 - 999
},
{
name: "InPredicate with values inside and outside page ranges",
predicate: InPredicate{
Column: cols[1], // timestamp column
Values: []Value{
Int64Value(50), // Inside page 1 (0-100)
Int64Value(300), // Inside page 2 (200-500)
Int64Value(150), // Outside all pages
Int64Value(600), // Outside all pages
},
},
want: rowRanges{
{Start: 0, End: 249}, // Page 1: contains 50
{Start: 250, End: 749}, // Page 2: contains 300
},
},
{
name: "InPredicate with values all outside page ranges",
predicate: InPredicate{
Column: cols[1], // timestamp column
Values: []Value{
Int64Value(150), // Outside all pages
Int64Value(600), // Outside all pages
},
},
want: nil, // No pages should be included
},
}
ctx := context.Background()

@ -245,8 +245,6 @@ func (r *LogsReader) Close() error {
}
func streamIDPredicate(ids iter.Seq[int64], columns []dataset.Column, columnDesc []*logsmd.ColumnDesc) dataset.Predicate {
var res dataset.Predicate
streamIDColumn := findColumnFromDesc(columns, columnDesc, func(desc *logsmd.ColumnDesc) bool {
return desc.Type == logsmd.COLUMN_TYPE_STREAM_ID
})
@ -254,23 +252,19 @@ func streamIDPredicate(ids iter.Seq[int64], columns []dataset.Column, columnDesc
return dataset.FalsePredicate{}
}
var values []dataset.Value
for id := range ids {
p := dataset.EqualPredicate{
Column: streamIDColumn,
Value: dataset.Int64Value(id),
}
values = append(values, dataset.Int64Value(id))
}
if res == nil {
res = p
} else {
res = dataset.OrPredicate{
Left: res,
Right: p,
}
}
if len(values) == 0 {
return nil
}
return res
return dataset.InPredicate{
Column: streamIDColumn,
Values: values,
}
}
func translateLogsPredicate(p LogsPredicate, columns []dataset.Column, columnDesc []*logsmd.ColumnDesc) dataset.Predicate {

Loading…
Cancel
Save