mirror of https://github.com/grafana/loki
chore(engine): introduce ScanSet node (#19524)
Signed-off-by: Robert Fratto <robertfratto@gmail.com>pull/19530/head
parent
b95bfabd42
commit
bd3f3dabe1
@ -1,248 +0,0 @@ |
||||
package executor |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"slices" |
||||
"sort" |
||||
|
||||
"github.com/apache/arrow-go/v18/arrow" |
||||
"github.com/apache/arrow-go/v18/arrow/array" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" |
||||
) |
||||
|
||||
type compareFunc[T comparable] func(a, b T) bool |
||||
|
||||
// NewSortMergePipeline returns a new pipeline that merges already sorted inputs into a single output.
|
||||
func NewSortMergePipeline(inputs []Pipeline, order physical.SortOrder, column physical.ColumnExpression, evaluator expressionEvaluator) (*KWayMerge, error) { |
||||
var lessFunc func(a, b int64) bool |
||||
switch order { |
||||
case physical.ASC: |
||||
lessFunc = func(a, b int64) bool { return a <= b } |
||||
case physical.DESC: |
||||
lessFunc = func(a, b int64) bool { return a >= b } |
||||
default: |
||||
return nil, fmt.Errorf("invalid sort order %v", order) |
||||
} |
||||
|
||||
for i := range inputs { |
||||
inputs[i] = newPrefetchingPipeline(inputs[i]) |
||||
} |
||||
|
||||
return &KWayMerge{ |
||||
inputs: inputs, |
||||
columnEval: evaluator.newFunc(column), |
||||
compare: lessFunc, |
||||
}, nil |
||||
} |
||||
|
||||
// KWayMerge is a k-way merge of multiple sorted inputs.
|
||||
// It requires the input batches to be sorted in the same order (ASC/DESC) as the SortMerge operator itself.
|
||||
// The sort order is defined by the direction of the query, which is either FORWARD or BACKWARDS,
|
||||
// which is applied to the SortMerge as well as to the DataObjScan during query planning.
|
||||
type KWayMerge struct { |
||||
inputs []Pipeline |
||||
initialized bool |
||||
batches []arrow.Record |
||||
exhausted []bool |
||||
offsets []int64 |
||||
columnEval evalFunc |
||||
compare compareFunc[int64] |
||||
} |
||||
|
||||
var _ Pipeline = (*KWayMerge)(nil) |
||||
|
||||
// Close implements Pipeline.
|
||||
func (p *KWayMerge) Close() { |
||||
for _, batch := range p.batches { |
||||
if batch != nil { |
||||
batch.Release() |
||||
} |
||||
} |
||||
for _, input := range p.inputs { |
||||
input.Close() |
||||
} |
||||
} |
||||
|
||||
// Read implements Pipeline.
|
||||
func (p *KWayMerge) Read(ctx context.Context) (arrow.Record, error) { |
||||
p.init(ctx) |
||||
return p.read(ctx) |
||||
} |
||||
|
||||
func (p *KWayMerge) init(ctx context.Context) { |
||||
if p.initialized { |
||||
return |
||||
} |
||||
|
||||
p.initialized = true |
||||
|
||||
n := len(p.inputs) |
||||
p.batches = make([]arrow.Record, n) |
||||
p.exhausted = make([]bool, n) |
||||
p.offsets = make([]int64, n) |
||||
|
||||
// Initialize pre-fetching on inputs
|
||||
for i := range p.inputs { |
||||
inp, ok := p.inputs[i].(*prefetchWrapper) |
||||
if ok { |
||||
inp.init(ctx) |
||||
} |
||||
} |
||||
|
||||
if p.compare == nil { |
||||
p.compare = func(a, b int64) bool { return a <= b } |
||||
} |
||||
} |
||||
|
||||
// Iterate through each record, looking at the value from their starting slice offset.
|
||||
// Track the top two winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest).
|
||||
// Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step.
|
||||
// Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to Read.
|
||||
func (p *KWayMerge) read(ctx context.Context) (arrow.Record, error) { |
||||
start: |
||||
timestamps := make([]int64, 0, len(p.inputs)) |
||||
inputIndexes := make([]int, 0, len(p.inputs)) |
||||
|
||||
loop: |
||||
for i := range len(p.inputs) { |
||||
// Skip exhausted inputs
|
||||
if p.exhausted[i] { |
||||
continue loop |
||||
} |
||||
|
||||
// Load next batch if it hasn't been loaded yet, or if current one is already fully consumed
|
||||
// Read another batch as long as the input yields zero-length batches.
|
||||
for p.batches[i] == nil || p.offsets[i] == p.batches[i].NumRows() { |
||||
// Reset offset for input at index i
|
||||
p.offsets[i] = 0 |
||||
|
||||
// Release previously fully consumed batch
|
||||
if p.batches[i] != nil { |
||||
p.batches[i].Release() |
||||
p.batches[i] = nil // remove reference to arrow.Record from slice
|
||||
} |
||||
|
||||
// Read next batch from input at index i
|
||||
// If it reaches EOF, mark the input as exhausted and continue with the next input.
|
||||
rec, err := p.inputs[i].Read(ctx) |
||||
if err != nil { |
||||
if errors.Is(err, EOF) { |
||||
p.exhausted[i] = true |
||||
continue loop |
||||
} |
||||
return nil, err |
||||
} |
||||
|
||||
p.batches[i] = rec |
||||
} |
||||
|
||||
// Fetch timestamp value at current offset
|
||||
col, err := p.columnEval(p.batches[i]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer col.Release() |
||||
|
||||
tsCol, ok := col.ToArray().(*array.Timestamp) |
||||
if !ok { |
||||
return nil, errors.New("column is not a timestamp column") |
||||
} |
||||
ts := tsCol.Value(int(p.offsets[i])) |
||||
tsCol.Release() |
||||
|
||||
// Populate slices for sorting
|
||||
inputIndexes = append(inputIndexes, i) |
||||
timestamps = append(timestamps, int64(ts)) |
||||
} |
||||
|
||||
// Pipeline is exhausted if no more input batches are available
|
||||
if !slices.Contains(p.exhausted, false) { |
||||
return nil, EOF |
||||
} |
||||
|
||||
if len(inputIndexes) == 0 { |
||||
goto start |
||||
} |
||||
|
||||
// If there is only a single remaining batch, return the remaining record
|
||||
if len(inputIndexes) == 1 { |
||||
j := inputIndexes[0] |
||||
start := p.offsets[j] |
||||
end := p.batches[j].NumRows() |
||||
|
||||
// check against empty last batch
|
||||
if start >= end || end == 0 { |
||||
return nil, EOF |
||||
} |
||||
|
||||
p.offsets[j] = end |
||||
return p.batches[j].NewSlice(start, end), nil |
||||
} |
||||
|
||||
sortIndexesByTimestamps(inputIndexes, timestamps, p.compare) |
||||
|
||||
// Return the slice of the current record
|
||||
j := inputIndexes[0] |
||||
|
||||
// Fetch timestamp value at current offset
|
||||
col, err := p.columnEval(p.batches[j]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer col.Release() |
||||
// We assume the column is a Uint64 array
|
||||
tsCol, ok := col.ToArray().(*array.Timestamp) |
||||
if !ok { |
||||
return nil, errors.New("column is not a timestamp column") |
||||
} |
||||
defer tsCol.Release() |
||||
|
||||
// Calculate start/end of the sub-slice of the record
|
||||
start := p.offsets[j] |
||||
end := start + 1 |
||||
for ; end < p.batches[j].NumRows(); end++ { |
||||
ts := tsCol.Value(int(end)) |
||||
if !p.compare(int64(ts), timestamps[1]) { |
||||
break |
||||
} |
||||
} |
||||
|
||||
// check against empty batch
|
||||
if start > end || end == 0 { |
||||
p.offsets[j] = end |
||||
return p.batches[j], nil |
||||
} |
||||
|
||||
p.offsets[j] = end |
||||
return p.batches[j].NewSlice(start, end), nil |
||||
} |
||||
|
||||
func sortIndexesByTimestamps(indexes []int, timestamps []int64, lessFn compareFunc[int64]) { |
||||
if len(indexes) != len(timestamps) { |
||||
panic("lengths of indexes and timestamps must match") |
||||
} |
||||
|
||||
pairs := make([]inputTimestampPair, len(indexes)) |
||||
for i := range indexes { |
||||
pairs[i] = inputTimestampPair{indexes[i], timestamps[i]} |
||||
} |
||||
|
||||
// Sort pairs by timestamp
|
||||
sort.SliceStable(pairs, func(i, j int) bool { |
||||
return lessFn(pairs[i].timestamp, pairs[j].timestamp) |
||||
}) |
||||
|
||||
// Unpack the sorted pairs back into the original slices
|
||||
for i := range pairs { |
||||
indexes[i] = pairs[i].index |
||||
timestamps[i] = pairs[i].timestamp |
||||
} |
||||
} |
||||
|
||||
type inputTimestampPair struct { |
||||
index int |
||||
timestamp int64 |
||||
} |
||||
@ -1,146 +0,0 @@ |
||||
package executor |
||||
|
||||
import ( |
||||
"slices" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/apache/arrow-go/v18/arrow" |
||||
"github.com/apache/arrow-go/v18/arrow/array" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" |
||||
"github.com/grafana/loki/v3/pkg/engine/internal/types" |
||||
) |
||||
|
||||
func TestSortMerge(t *testing.T) { |
||||
now := time.Unix(1000000, 0) |
||||
var batchSize = int64(3) |
||||
|
||||
c := &Context{ |
||||
batchSize: batchSize, |
||||
} |
||||
|
||||
t.Run("invalid column name", func(t *testing.T) { |
||||
merge := &physical.SortMerge{ |
||||
Column: &physical.ColumnExpr{ |
||||
Ref: types.ColumnRef{ |
||||
Column: "not_a_timestamp_column", |
||||
Type: types.ColumnTypeBuiltin, |
||||
}, |
||||
}, |
||||
Order: physical.ASC, |
||||
} |
||||
|
||||
inputs := []Pipeline{ |
||||
ascendingTimestampPipeline(now.Add(1*time.Nanosecond)).Pipeline(batchSize, 10), |
||||
ascendingTimestampPipeline(now.Add(2*time.Nanosecond)).Pipeline(batchSize, 10), |
||||
ascendingTimestampPipeline(now.Add(3*time.Nanosecond)).Pipeline(batchSize, 10), |
||||
} |
||||
|
||||
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{}) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := t.Context() |
||||
_, err = pipeline.Read(ctx) |
||||
require.ErrorContains(t, err, "column is not a timestamp column") |
||||
}) |
||||
|
||||
t.Run("ascending timestamp", func(t *testing.T) { |
||||
merge := &physical.SortMerge{ |
||||
Column: &physical.ColumnExpr{ |
||||
Ref: types.ColumnRef{ |
||||
Column: types.ColumnNameBuiltinTimestamp, |
||||
Type: types.ColumnTypeBuiltin, |
||||
}, |
||||
}, |
||||
Order: physical.ASC, |
||||
} |
||||
|
||||
inputs := []Pipeline{ |
||||
ascendingTimestampPipeline(now.Add(1*time.Nanosecond)).Pipeline(batchSize, 10), |
||||
ascendingTimestampPipeline(now.Add(2*time.Millisecond)).Pipeline(batchSize, 10), |
||||
ascendingTimestampPipeline(now.Add(3*time.Second)).Pipeline(batchSize, 10), |
||||
} |
||||
|
||||
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{}) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := t.Context() |
||||
timestamps := make([]arrow.Timestamp, 0, 30) |
||||
var batches, rows int64 |
||||
for { |
||||
batch, err := pipeline.Read(ctx) |
||||
if err == EOF { |
||||
break |
||||
} |
||||
if err != nil { |
||||
t.Fatalf("did not expect error, got %s", err.Error()) |
||||
} |
||||
|
||||
tsCol, err := c.evaluator.eval(merge.Column, batch) |
||||
require.NoError(t, err) |
||||
defer tsCol.Release() |
||||
arr := tsCol.ToArray().(*array.Timestamp) |
||||
defer arr.Release() |
||||
|
||||
timestamps = append(timestamps, arr.Values()...) |
||||
batches++ |
||||
rows += batch.NumRows() |
||||
} |
||||
|
||||
// Check if ts column is sorted
|
||||
require.Truef(t, |
||||
slices.IsSortedFunc(timestamps, func(a, b arrow.Timestamp) int { return int(a - b) }), |
||||
"timestamps are not sorted in ASC order: %v", timestamps) |
||||
}) |
||||
|
||||
t.Run("descending timestamp", func(t *testing.T) { |
||||
merge := &physical.SortMerge{ |
||||
Column: &physical.ColumnExpr{ |
||||
Ref: types.ColumnRef{ |
||||
Column: types.ColumnNameBuiltinTimestamp, |
||||
Type: types.ColumnTypeBuiltin, |
||||
}, |
||||
}, |
||||
Order: physical.DESC, |
||||
} |
||||
|
||||
inputs := []Pipeline{ |
||||
descendingTimestampPipeline(now.Add(1*time.Nanosecond)).Pipeline(batchSize, 10), |
||||
descendingTimestampPipeline(now.Add(2*time.Millisecond)).Pipeline(batchSize, 10), |
||||
descendingTimestampPipeline(now.Add(3*time.Second)).Pipeline(batchSize, 10), |
||||
} |
||||
|
||||
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{}) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := t.Context() |
||||
timestamps := make([]arrow.Timestamp, 0, 30) |
||||
var batches, rows int64 |
||||
for { |
||||
batch, err := pipeline.Read(ctx) |
||||
if err == EOF { |
||||
break |
||||
} |
||||
if err != nil { |
||||
t.Fatalf("did not expect error, got %s", err.Error()) |
||||
} |
||||
|
||||
tsCol, err := c.evaluator.eval(merge.Column, batch) |
||||
defer tsCol.Release() |
||||
require.NoError(t, err) |
||||
arr := tsCol.ToArray().(*array.Timestamp) |
||||
defer arr.Release() |
||||
|
||||
timestamps = append(timestamps, arr.Values()...) |
||||
batches++ |
||||
rows += batch.NumRows() |
||||
} |
||||
|
||||
// Check if ts column is sorted
|
||||
require.Truef(t, |
||||
slices.IsSortedFunc(timestamps, func(a, b arrow.Timestamp) int { return int(b - a) }), |
||||
"timestamps are not sorted in DESC order: %v", timestamps) |
||||
}) |
||||
} |
||||
@ -1,31 +0,0 @@ |
||||
package physical |
||||
|
||||
import "fmt" |
||||
|
||||
// Merge represents a merge operation in the physical plan that merges
|
||||
// N inputs to 1 output.
|
||||
type Merge struct { |
||||
id string |
||||
} |
||||
|
||||
// ID implements the [Node] interface.
|
||||
// Returns a string that uniquely identifies the node in the plan.
|
||||
func (m *Merge) ID() string { |
||||
if m.id == "" { |
||||
return fmt.Sprintf("%p", m) |
||||
} |
||||
|
||||
return m.id |
||||
} |
||||
|
||||
// Type implements the [Node] interface.
|
||||
// Returns the type of the node.
|
||||
func (m *Merge) Type() NodeType { |
||||
return NodeTypeMerge |
||||
} |
||||
|
||||
// Accept implements the [Node] interface.
|
||||
// Dispatches itself to the provided [Visitor] v
|
||||
func (m *Merge) Accept(v Visitor) error { |
||||
return v.VisitMerge(m) |
||||
} |
||||
@ -0,0 +1,70 @@ |
||||
package physical |
||||
|
||||
import ( |
||||
"fmt" |
||||
) |
||||
|
||||
// ScanTarget represents a target of a [ScanSet].
|
||||
type ScanTarget struct { |
||||
Type ScanType |
||||
|
||||
// DataObj is non-nil if Type is [ScanTypeDataObject]. Despite DataObjScan
|
||||
// implementing [Node], the value is not inserted into the graph as a node.
|
||||
DataObject *DataObjScan |
||||
} |
||||
|
||||
// ScanType represents the data being scanned in a target of a [ScanSet].
|
||||
type ScanType int |
||||
|
||||
const ( |
||||
ScanTypeInvalid ScanType = iota |
||||
ScanTypeDataObject |
||||
) |
||||
|
||||
// String returns a string representation of the scan type.
|
||||
func (ty ScanType) String() string { |
||||
switch ty { |
||||
case ScanTypeInvalid: |
||||
return "ScanTypeInvalid" |
||||
case ScanTypeDataObject: |
||||
return "ScanTypeDataObject" |
||||
default: |
||||
return fmt.Sprintf("ScanType(%d)", ty) |
||||
} |
||||
} |
||||
|
||||
// ScanSet represents a physical plan operation for reading data from targets.
|
||||
type ScanSet struct { |
||||
id string |
||||
|
||||
// Targets to scan.
|
||||
Targets []*ScanTarget |
||||
|
||||
// Projections are used to limit the columns that are read to the ones
|
||||
// provided in the column expressions to reduce the amount of data that
|
||||
// needs to be processed.
|
||||
Projections []ColumnExpression |
||||
|
||||
// Predicates are used to filter rows to reduce the amount of rows that are
|
||||
// returned. Predicates would almost always contain a time range filter to
|
||||
// only read the logs for the requested time range.
|
||||
Predicates []Expression |
||||
} |
||||
|
||||
// ID returns a string that uniquely identifies the node in the plan.
|
||||
func (s *ScanSet) ID() string { |
||||
if s.id == "" { |
||||
return fmt.Sprintf("%p", s) |
||||
} |
||||
return s.id |
||||
} |
||||
|
||||
// Type returns [NodeTypeScanSet].
|
||||
func (s *ScanSet) Type() NodeType { |
||||
return NodeTypeScanSet |
||||
} |
||||
|
||||
// Accept dispatches s to the provided [Visitor] v.
|
||||
func (s *ScanSet) Accept(v Visitor) error { |
||||
return v.VisitScanSet(s) |
||||
} |
||||
@ -0,0 +1,23 @@ |
||||
package physical |
||||
|
||||
type SortOrder uint8 |
||||
|
||||
const ( |
||||
UNSORTED SortOrder = iota |
||||
ASC |
||||
DESC |
||||
) |
||||
|
||||
// String returns the string representation of the [SortOrder].
|
||||
func (o SortOrder) String() string { |
||||
switch o { |
||||
case UNSORTED: |
||||
return "UNSORTED" |
||||
case ASC: |
||||
return "ASC" |
||||
case DESC: |
||||
return "DESC" |
||||
default: |
||||
return "UNDEFINED" |
||||
} |
||||
} |
||||
@ -1,63 +0,0 @@ |
||||
package physical |
||||
|
||||
import "fmt" |
||||
|
||||
type SortOrder uint8 |
||||
|
||||
const ( |
||||
UNSORTED SortOrder = iota |
||||
ASC |
||||
DESC |
||||
) |
||||
|
||||
// String returns the string representation of the [SortOrder].
|
||||
func (o SortOrder) String() string { |
||||
switch o { |
||||
case UNSORTED: |
||||
return "UNSORTED" |
||||
case ASC: |
||||
return "ASC" |
||||
case DESC: |
||||
return "DESC" |
||||
default: |
||||
return "UNDEFINED" |
||||
} |
||||
} |
||||
|
||||
// SortMerge represents a sort+merge operation in the physical plan. It
|
||||
// performs sorting of data based on the specified Column and Order direction.
|
||||
type SortMerge struct { |
||||
id string |
||||
|
||||
// Column defines the column expression by which the rows should be sorted.
|
||||
// This is almost always the timestamp column, because it is the column
|
||||
// by which the results of the DataObjScan node are sorted. This allows
|
||||
// for sorting and merging multiple already sorted inputs from the DataObjScan
|
||||
// without being a pipeline breaker.
|
||||
Column ColumnExpression |
||||
// Order defines whether the column should be sorted in ascending or
|
||||
// descending order. Must match the read direction of the DataObjScan that
|
||||
// feeds into the SortMerge.
|
||||
Order SortOrder |
||||
} |
||||
|
||||
// ID implements the [Node] interface.
|
||||
// Returns a string that uniquely identifies the node in the plan.
|
||||
func (m *SortMerge) ID() string { |
||||
if m.id == "" { |
||||
return fmt.Sprintf("%p", m) |
||||
} |
||||
return m.id |
||||
} |
||||
|
||||
// Type implements the [Node] interface.
|
||||
// Returns the type of the node.
|
||||
func (*SortMerge) Type() NodeType { |
||||
return NodeTypeSortMerge |
||||
} |
||||
|
||||
// Accept implements the [Node] interface.
|
||||
// Dispatches itself to the provided [Visitor] v
|
||||
func (m *SortMerge) Accept(v Visitor) error { |
||||
return v.VisitSortMerge(m) |
||||
} |
||||
Loading…
Reference in new issue