chore(engine): Correctly propagate search direction to data object scan (#18112)

The sort order of the physical plan was only used in the SortMerge node, but not on the DataObjScan.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/18124/head
Christian Haudum 10 months ago committed by GitHub
parent aac2860e86
commit 78342c030a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      pkg/engine/executor/dataobjscan.go
  2. 12
      pkg/engine/executor/dataobjscan_test.go
  3. 11
      pkg/engine/planner/physical/dataobjscan.go
  4. 22
      pkg/engine/planner/physical/planner.go
  5. 8
      pkg/engine/planner/physical/sortmerge.go

@ -46,7 +46,7 @@ type dataobjScanOptions struct {
Predicates []logs.RowPredicate // Predicate to apply to the logs.
Projections []physical.ColumnExpression // Columns to include. An empty slice means all columns.
Direction physical.Direction // Direction of timestamps to return.
Direction physical.SortOrder // Order of timestamps to return (ASC=Forward, DESC=Backward)
Limit uint32 // A limit on the number of rows to return (0=unlimited).
}
@ -262,8 +262,12 @@ func (s *dataobjScan) read() (arrow.Record, error) {
return rb.NewRecord(), nil
}
func (s *dataobjScan) getLessFunc(dir physical.Direction) func(a, b logs.Record) bool {
// compareStreams is used when two records have the same timestamp.
// getLessFunc returns a "less comparison" function for records for the sort heap.
// direction determines the search order:
// BACKWARD is a backward search starting at the end of the time range.
// FORWARD is a forward search starting at the beginning of the time range.
// If two records have the same timestamp, the compareStreams function is used to determine the sort order.
func (s *dataobjScan) getLessFunc(direction physical.SortOrder) func(a, b logs.Record) bool {
compareStreams := func(a, b logs.Record) bool {
aStream, ok := s.streams[a.StreamID]
if !ok {
@ -278,15 +282,15 @@ func (s *dataobjScan) getLessFunc(dir physical.Direction) func(a, b logs.Record)
return labels.Compare(aStream, bStream) < 0
}
switch dir {
case physical.Forward:
switch direction {
case physical.ASC:
return func(a, b logs.Record) bool {
if a.Timestamp.Equal(b.Timestamp) {
compareStreams(a, b)
}
return a.Timestamp.After(b.Timestamp)
}
case physical.Backwards:
case physical.DESC:
return func(a, b logs.Record) bool {
if a.Timestamp.Equal(b.Timestamp) {
compareStreams(a, b)

@ -69,7 +69,7 @@ func Test_dataobjScan(t *testing.T) {
Object: obj,
StreamIDs: []int64{1, 2}, // All streams
Projections: nil, // All columns
Direction: physical.Forward,
Direction: physical.ASC,
Limit: 0, // No limit
})
@ -102,7 +102,7 @@ prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}},
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeLabel}},
},
Direction: physical.Forward,
Direction: physical.ASC,
Limit: 0, // No limit
})
@ -132,7 +132,7 @@ prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.Forward,
Direction: physical.ASC,
Limit: 0, // No limit
})
@ -194,7 +194,7 @@ func Test_dataobjScan_DuplicateColumns(t *testing.T) {
Object: obj,
StreamIDs: []int64{1, 2, 3}, // All streams
Projections: nil, // All columns
Direction: physical.Forward,
Direction: physical.ASC,
Limit: 0, // No limit
})
@ -229,7 +229,7 @@ prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3`
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.Forward,
Direction: physical.ASC,
Limit: 0, // No limit
})
@ -256,7 +256,7 @@ NULL,NULL`
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.Forward,
Direction: physical.ASC,
Limit: 0, // No limit
})

@ -6,15 +6,6 @@ import "fmt"
// object storage.
type DataObjLocation string
// Direction defines the order in which the data object is read, which is
// either forward (ascending timestamps) or backwards (descending timestamps).
type Direction int8
const (
Forward Direction = iota
Backwards
)
// DataObjScan represents a physical plan operation for reading data objects.
// It contains information about the object location, stream IDs, projections,
// predicates, scan direction, and result limit for reading data from a data
@ -37,7 +28,7 @@ type DataObjScan struct {
// only read the logs for the requested time range.
Predicates []Expression
// Direction defines in what order columns are read.
Direction Direction
Direction SortOrder
// Limit is used to stop scanning the data object once it is reached.
Limit uint32
}

@ -7,6 +7,11 @@ import (
"github.com/grafana/loki/v3/pkg/engine/planner/logical"
)
// Internal state of the planner
type state struct {
direction SortOrder
}
// Planner creates an executable physical plan from a logical plan.
// Planning is done in two steps:
// 1. Convert
@ -18,6 +23,7 @@ import (
type Planner struct {
catalog Catalog
plan *Plan
state state
}
// NewPlanner creates a new planner instance with the given context.
@ -28,7 +34,7 @@ func NewPlanner(catalog Catalog) *Planner {
// Build converts a given logical plan into a physical plan and returns an error if the conversion fails.
// The resulting plan can be accessed using [Planner.Plan].
func (p *Planner) Build(lp *logical.Plan) (*Plan, error) {
p.plan = &Plan{}
p.reset()
for _, inst := range lp.Instructions {
switch inst := inst.(type) {
case *logical.Return:
@ -45,6 +51,12 @@ func (p *Planner) Build(lp *logical.Plan) (*Plan, error) {
return nil, errors.New("logical plan has no return value")
}
// reset resets the internal state of the planner
func (p *Planner) reset() {
p.plan = &Plan{}
p.state = state{}
}
// Convert a predicate from an [logical.Instruction] into an [Expression].
func (p *Planner) convertPredicate(inst logical.Value) Expression {
switch inst := inst.(type) {
@ -96,6 +108,7 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable) ([]Node, error) {
node := &DataObjScan{
Location: objects[i],
StreamIDs: streams[i],
Direction: p.state.direction, // apply direction from previously visited Sort node
}
p.plan.addNode(node)
nodes = append(nodes, node)
@ -123,14 +136,15 @@ func (p *Planner) processSelect(lp *logical.Select) ([]Node, error) {
// Convert [logical.Sort] into one [SortMerge] node.
func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) {
order := ASC
if !lp.Ascending {
order = DESC
order := DESC
if lp.Ascending {
order = ASC
}
node := &SortMerge{
Column: &ColumnExpr{Ref: lp.Column.Ref},
Order: order,
}
p.state.direction = order
p.plan.addNode(node)
children, err := p.process(lp.Table)
if err != nil {

@ -5,17 +5,17 @@ import "fmt"
type SortOrder uint8
const (
DESC SortOrder = iota
ASC
ASC SortOrder = iota
DESC
)
// String returns the string representation of the [SortOrder].
func (o SortOrder) String() string {
switch o {
case DESC:
return "DESC"
case ASC:
return "ASC"
case DESC:
return "DESC"
default:
return "UNDEFINED"
}

Loading…
Cancel
Save