chore(dataobj): read non-projected predicate columns (#18797)

pull/18841/head
Ashwanth 9 months ago committed by GitHub
parent bf0334394c
commit 88c8b44f2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 55
      pkg/dataobj/sections/logs/predicate.go
  2. 88
      pkg/dataobj/sections/logs/reader.go
  3. 88
      pkg/dataobj/sections/logs/reader_test.go
  4. 4
      pkg/logql/bench/generator_query.go

@ -1,6 +1,10 @@
package logs
import "github.com/apache/arrow-go/v18/arrow/scalar"
import (
"fmt"
"github.com/apache/arrow-go/v18/arrow/scalar"
)
// Predicate is an expression used to filter column values in a [Reader].
type Predicate interface{ isPredicate() }
@ -116,3 +120,52 @@ func walkPredicate(p Predicate, fn func(Predicate) bool) {
fn(nil)
}
// predicateColumns returns a slice of all columns referenced in the given predicates.
// It ensures that each column is only included once, even if it appears in multiple predicates.
func predicateColumns(predicates []Predicate) []*Column {
exists := make(map[*Column]struct{})
columns := make([]*Column, 0, len(predicates))
// append column if it is not a duplicate.
appendColumn := func(c *Column) {
if _, ok := exists[c]; ok {
return
}
columns = append(columns, c)
exists[c] = struct{}{}
}
for _, p := range predicates {
walkPredicate(p, func(p Predicate) bool {
switch p := p.(type) {
case nil: // End of walk; nothing to do.
case AndPredicate: // Nothing to do.
case OrPredicate: // Nothing to do.
case NotPredicate: // Nothing to do.
case TruePredicate: // Nothing to do.
case FalsePredicate: // Nothing to do.
case EqualPredicate:
appendColumn(p.Column)
case InPredicate:
appendColumn(p.Column)
case GreaterThanPredicate:
appendColumn(p.Column)
case LessThanPredicate:
appendColumn(p.Column)
case FuncPredicate:
appendColumn(p.Column)
default:
panic(fmt.Sprintf("logs.predicateColumns: unsupported predicate type %T", p))
}
return true
})
}
return columns
}

@ -40,35 +40,26 @@ type ReaderOptions struct {
// Validate returns an error if the opts is not valid. ReaderOptions are only
// valid when:
//
// - Each [Column] in Columns belongs to the same [Section].
// - Each [Predicate] in Predicates references a [Column] from Columns.
// - Each [Column] in Columns and Predicates belongs to the same [Section].
// - Scalar values used in predicates are of a supported type: an int64,
// uint64, timestamp, or a byte array.
func (opts *ReaderOptions) Validate() error {
columnLookup := make(map[*Column]struct{}, len(opts.Columns))
if len(opts.Columns) > 0 {
// Ensure all columns belong to the same section.
var checkSection *Section
for _, col := range opts.Columns {
if checkSection != nil && col.Section != checkSection {
return fmt.Errorf("all columns must belong to the same section: got=%p want=%p", col.Section, checkSection)
} else if checkSection == nil {
checkSection = col.Section
}
columnLookup[col] = struct{}{}
// Ensure all columns belong to the same section.
var checkSection *Section
var errs []error
validateSection := func(col *Column) {
if checkSection != nil && col.Section != checkSection {
errs = append(errs, fmt.Errorf("all columns must belong to the same section: got=%p want=%p", col.Section, checkSection))
} else if checkSection == nil {
checkSection = col.Section
}
}
var errs []error
validateColumn := func(col *Column) {
if col == nil {
errs = append(errs, fmt.Errorf("column is nil"))
} else if _, found := columnLookup[col]; !found {
errs = append(errs, fmt.Errorf("column %p not in Columns", col))
}
for _, col := range opts.Columns {
validateSection(col)
}
if len(errs) > 0 {
return errors.Join(errs...)
}
validateScalar := func(s scalar.Scalar) {
@ -80,8 +71,7 @@ func (opts *ReaderOptions) Validate() error {
for _, p := range opts.Predicates {
walkPredicate(p, func(p Predicate) bool {
// Validate that predicates reference valid columns and use valid
// scalars.
// Validate that predicates use valid scalars.
switch p := p.(type) {
case nil: // End of walk; nothing to do.
@ -92,25 +82,25 @@ func (opts *ReaderOptions) Validate() error {
case FalsePredicate: // Nothing to do.
case EqualPredicate:
validateColumn(p.Column)
validateSection(p.Column)
validateScalar(p.Value)
case InPredicate:
validateColumn(p.Column)
validateSection(p.Column)
for _, val := range p.Values {
validateScalar(val)
}
case GreaterThanPredicate:
validateColumn(p.Column)
validateSection(p.Column)
validateScalar(p.Value)
case LessThanPredicate:
validateColumn(p.Column)
validateSection(p.Column)
validateScalar(p.Value)
case FuncPredicate:
validateColumn(p.Column)
validateSection(p.Column)
default:
errs = append(errs, fmt.Errorf("unrecognized predicate type %T", p))
@ -186,6 +176,11 @@ func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.Record, error)
row := r.buf[rowIndex]
for columnIndex, val := range row.Values {
if columnIndex >= len(r.opts.Columns) {
// Ignore columns that are not in projection list.
continue
}
columnBuilder := builder.Field(columnIndex)
if val.IsNil() {
@ -231,16 +226,23 @@ func (r *Reader) init() error {
r.opts.Allocator = memory.DefaultAllocator
}
dset, err := newColumnsDataset(r.opts.Columns)
// Compose dataset using projected columns and any additional columns
// used for evaluating predicates.
//
// Non-projected columns are appended to the end of the list to allow
// easy filtering of Row Values with index >= len(r.opts.Columns).
cols := appendMissingColumns(r.opts.Columns, predicateColumns(r.opts.Predicates))
dset, err := newColumnsDataset(cols)
if err != nil {
return fmt.Errorf("creating dataset: %w", err)
} else if len(dset.Columns()) != len(r.opts.Columns) {
return fmt.Errorf("dataset has %d columns, expected %d", len(dset.Columns()), len(r.opts.Columns))
} else if len(dset.Columns()) != len(cols) {
return fmt.Errorf("dataset has %d columns, expected %d", len(dset.Columns()), len(cols))
}
columnLookup := make(map[*Column]dataset.Column, len(r.opts.Columns))
columnLookup := make(map[*Column]dataset.Column, len(cols))
for i, col := range dset.Columns() {
columnLookup[r.opts.Columns[i]] = col
columnLookup[cols[i]] = col
}
preds, err := mapPredicates(r.opts.Predicates, columnLookup)
@ -422,6 +424,22 @@ func (r *Reader) Close() error {
return nil
}
func appendMissingColumns(dst, src []*Column) []*Column {
exists := make(map[*Column]struct{}, len(dst))
for _, col := range dst {
exists[col] = struct{}{}
}
for _, col := range src {
if _, ok := exists[col]; !ok {
// Not seen, add it.
dst = append(dst, col)
}
}
return dst
}
func columnsSchema(cols []*Column) *arrow.Schema {
fields := make([]arrow.Field, 0, len(cols))
for _, col := range cols {

@ -45,45 +45,67 @@ func TestReader(t *testing.T) {
require.Equal(t, "", message.Name)
require.Equal(t, logs.ColumnTypeMessage, message.Type)
r := logs.NewReader(logs.ReaderOptions{
Columns: []*logs.Column{streamID, traceID, message},
Allocator: alloc,
Predicates: []logs.Predicate{
logs.FuncPredicate{
Column: traceID,
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
if !value.IsValid() {
return false
}
bb := value.(*scalar.String).Value.Bytes()
return bytes.Equal(bb, []byte("abcdef")) || bytes.Equal(bb, []byte("123456"))
},
for _, tt := range []struct {
name string
columns []*logs.Column
expected arrowtest.Rows
}{
{
name: "basic reads with predicate",
columns: []*logs.Column{streamID, traceID, message},
expected: arrowtest.Rows{
{"stream_id.int64": int64(2), "trace_id.metadata.utf8": "123456", "message.utf8": "foo bar"},
{"stream_id.int64": int64(1), "trace_id.metadata.utf8": "abcdef", "message.utf8": "goodbye, world!"},
},
logs.InPredicate{
Column: streamID,
Values: []scalar.Scalar{
scalar.NewInt64Scalar(1),
scalar.NewInt64Scalar(2),
},
},
// tests that the reader evaluates predicates correctly even when predicate columns are not projected.
{
name: "reads with predicate columns that are not projected",
columns: []*logs.Column{streamID, message},
expected: arrowtest.Rows{
{"stream_id.int64": int64(2), "message.utf8": "foo bar"},
{"stream_id.int64": int64(1), "message.utf8": "goodbye, world!"},
},
},
})
} {
t.Run(tt.name, func(t *testing.T) {
r := logs.NewReader(logs.ReaderOptions{
Columns: tt.columns,
Allocator: alloc,
Predicates: []logs.Predicate{
logs.FuncPredicate{
Column: traceID,
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
if !value.IsValid() {
return false
}
bb := value.(*scalar.String).Value.Bytes()
return bytes.Equal(bb, []byte("abcdef")) || bytes.Equal(bb, []byte("123456"))
},
},
logs.InPredicate{
Column: streamID,
Values: []scalar.Scalar{
scalar.NewInt64Scalar(1),
scalar.NewInt64Scalar(2),
},
},
},
})
expect := arrowtest.Rows{
{"stream_id.int64": int64(2), "trace_id.metadata.utf8": "123456", "message.utf8": "foo bar"},
{"stream_id.int64": int64(1), "trace_id.metadata.utf8": "abcdef", "message.utf8": "goodbye, world!"},
}
actualTable, err := readTable(context.Background(), r)
if actualTable != nil {
defer actualTable.Release()
}
require.NoError(t, err)
actualTable, err := readTable(context.Background(), r)
if actualTable != nil {
defer actualTable.Release()
}
require.NoError(t, err)
actual, err := arrowtest.TableRows(alloc, actualTable)
require.NoError(t, err, "failed to get rows from table")
require.Equal(t, tt.expected, actual)
})
actual, err := arrowtest.TableRows(alloc, actualTable)
require.NoError(t, err, "failed to get rows from table")
require.Equal(t, expect, actual)
}
}
func buildSection(t *testing.T, recs []logs.Record) *logs.Section {

@ -191,6 +191,9 @@ func (g *TestCaseGenerator) Generate() []TestCase {
// Basic selector
addBidirectional(selector, g.logGenCfg.StartTime, end)
// With line filter
addBidirectional(selector+` |= "level"`, g.logGenCfg.StartTime, end)
// With structured metadata filters
addBidirectional(selector+` | detected_level="error"`, g.logGenCfg.StartTime, end)
addBidirectional(selector+` | detected_level="warn"`, g.logGenCfg.StartTime, end)
@ -209,6 +212,7 @@ func (g *TestCaseGenerator) Generate() []TestCase {
baseRangeAggregationQueries := []string{
fmt.Sprintf(`count_over_time(%s[%s])`, selector, rangeInterval),
fmt.Sprintf(`count_over_time(%s | detected_level=~"error|warn" [%s])`, selector, rangeInterval),
fmt.Sprintf(`count_over_time(%s |= "level" [%s])`, selector, rangeInterval),
fmt.Sprintf(`rate(%s | detected_level=~"error|warn" [%s])`, selector, rangeInterval),
}

Loading…
Cancel
Save