mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
301 lines
8.4 KiB
301 lines
8.4 KiB
package executor
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/cespare/xxhash/v2"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
|
|
)
|
|
|
|
type rangeAggregationOptions struct {
|
|
partitionBy []physical.ColumnExpression
|
|
|
|
// start and end timestamps are equal for instant queries.
|
|
startTs time.Time // start timestamp of the query
|
|
endTs time.Time // end timestamp of the query
|
|
rangeInterval time.Duration // range interval
|
|
step time.Duration // step used for range queries
|
|
}
|
|
|
|
// RangeAggregationPipeline is a pipeline that performs aggregations over a time window.
|
|
//
|
|
// 1. It reads from the input pipelines
|
|
// 2. Partitions the data by the specified columns
|
|
// 3. Applies the aggregation function on each partition
|
|
//
|
|
// Current version only supports counting for instant queries.
|
|
type RangeAggregationPipeline struct {
|
|
state state
|
|
inputs []Pipeline
|
|
|
|
aggregator *partitionAggregator
|
|
evaluator expressionEvaluator // used to evaluate column expressions
|
|
opts rangeAggregationOptions
|
|
}
|
|
|
|
func NewRangeAggregationPipeline(inputs []Pipeline, evaluator expressionEvaluator, opts rangeAggregationOptions) (*RangeAggregationPipeline, error) {
|
|
return &RangeAggregationPipeline{
|
|
inputs: inputs,
|
|
evaluator: evaluator,
|
|
aggregator: newPartitionAggregator(),
|
|
opts: opts,
|
|
}, nil
|
|
}
|
|
|
|
// Read reads the next value into its state.
|
|
// It returns an error if reading fails or when the pipeline is exhausted. In this case, the function returns EOF.
|
|
// The implementation must retain the returned error in its state and return it with subsequent Value() calls.
|
|
func (r *RangeAggregationPipeline) Read(ctx context.Context) error {
|
|
// if the state already has an error, do not attempt to read.
|
|
if r.state.err != nil {
|
|
return r.state.err
|
|
}
|
|
|
|
if r.state.batch != nil {
|
|
r.state.batch.Release()
|
|
}
|
|
|
|
record, err := r.read(ctx)
|
|
r.state = newState(record, err)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("run range aggregation: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODOs:
|
|
// - Support implicit partitioning by all labels when partitionBy is empty
|
|
// - Use columnar access pattern. Current approach is row-based which does not benefit from the storage format.
|
|
// - Add toggle to return partial results on Read() call instead of returning only after exhausing all inputs.
|
|
func (r *RangeAggregationPipeline) read(ctx context.Context) (arrow.Record, error) {
|
|
var (
|
|
isTSInRange func(t time.Time) bool
|
|
tsColumnExpr = &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{
|
|
Column: types.ColumnNameBuiltinTimestamp,
|
|
Type: types.ColumnTypeBuiltin,
|
|
},
|
|
} // timestamp column expression
|
|
|
|
// reused on each row read
|
|
labelValues = make([]string, len(r.opts.partitionBy))
|
|
)
|
|
|
|
{
|
|
evalTs := r.opts.endTs
|
|
earliestTs := r.opts.endTs.Add(-r.opts.rangeInterval)
|
|
isTSInRange = func(t time.Time) bool {
|
|
// Aggregate entries that belong in [earliestTs, evalTs)
|
|
return t.Compare(earliestTs) >= 0 && t.Compare(evalTs) < 0
|
|
}
|
|
}
|
|
|
|
r.aggregator.Reset() // reset before reading new inputs
|
|
inputsExhausted := false
|
|
for !inputsExhausted {
|
|
inputsExhausted = true
|
|
|
|
for _, input := range r.inputs {
|
|
if err := input.Read(ctx); err != nil {
|
|
if errors.Is(err, EOF) {
|
|
continue
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
inputsExhausted = false
|
|
record, _ := input.Value()
|
|
|
|
// extract all the columns that are used for partitioning
|
|
arrays := make([]*array.String, 0, len(r.opts.partitionBy))
|
|
for _, columnExpr := range r.opts.partitionBy {
|
|
vec, err := r.evaluator.eval(columnExpr, record)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if vec.Type() != datatype.Loki.String {
|
|
return nil, fmt.Errorf("unsupported datatype for partitioning %s", vec.Type())
|
|
}
|
|
|
|
arrays = append(arrays, vec.ToArray().(*array.String))
|
|
}
|
|
|
|
// extract timestamp column to check if the entry is in range
|
|
vec, err := r.evaluator.eval(tsColumnExpr, record)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tsCol := vec.ToArray().(*array.Timestamp)
|
|
|
|
for row := range int(record.NumRows()) {
|
|
if !isTSInRange(tsCol.Value(row).ToTime(arrow.Nanosecond)) {
|
|
continue
|
|
}
|
|
|
|
// reset label values and hash for each row
|
|
clear(labelValues)
|
|
for col, arr := range arrays {
|
|
labelValues[col] = arr.Value(row)
|
|
}
|
|
r.aggregator.Add(labelValues)
|
|
}
|
|
}
|
|
}
|
|
|
|
if r.aggregator.NumOfPartitions() == 0 {
|
|
return nil, EOF // no values to aggregate & reached EOF
|
|
}
|
|
|
|
// TODO: schema is same for each read call when partitionBy is defined, we can create it once and reuse.
|
|
fields := make([]arrow.Field, 0, len(r.opts.partitionBy)+2)
|
|
fields = append(fields,
|
|
arrow.Field{
|
|
Name: types.ColumnNameBuiltinTimestamp,
|
|
Type: datatype.Arrow.Timestamp,
|
|
Nullable: false,
|
|
Metadata: datatype.ColumnMetadataBuiltinTimestamp,
|
|
},
|
|
arrow.Field{
|
|
Name: types.ColumnNameGeneratedValue,
|
|
Type: datatype.Arrow.Integer,
|
|
Nullable: false,
|
|
Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer),
|
|
},
|
|
)
|
|
|
|
for _, column := range r.opts.partitionBy {
|
|
columnExpr, ok := column.(*physical.ColumnExpr)
|
|
if !ok {
|
|
panic(fmt.Sprintf("invalid column expression type %T", column))
|
|
}
|
|
|
|
fields = append(fields, arrow.Field{
|
|
Name: columnExpr.Ref.Column,
|
|
Type: datatype.Arrow.String,
|
|
Nullable: true,
|
|
Metadata: datatype.ColumnMetadata(columnExpr.Ref.Type, datatype.Loki.String),
|
|
})
|
|
}
|
|
|
|
schema := arrow.NewSchema(fields, nil)
|
|
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
|
|
defer rb.Release()
|
|
|
|
ts, _ := arrow.TimestampFromTime(r.opts.endTs, arrow.Nanosecond)
|
|
for _, entry := range r.aggregator.entries {
|
|
rb.Field(0).(*array.TimestampBuilder).Append(ts)
|
|
rb.Field(1).(*array.Int64Builder).Append(entry.count)
|
|
|
|
for col, val := range entry.labelValues {
|
|
builder := rb.Field(col + 2) // offset by 2 as the first 2 fields are timestamp and value
|
|
if val == "" {
|
|
builder.(*array.StringBuilder).AppendNull()
|
|
} else {
|
|
builder.(*array.StringBuilder).Append(val)
|
|
}
|
|
}
|
|
}
|
|
|
|
return rb.NewRecord(), nil
|
|
}
|
|
|
|
// Value returns the current value in state.
|
|
func (r *RangeAggregationPipeline) Value() (arrow.Record, error) {
|
|
return r.state.Value()
|
|
}
|
|
|
|
// Close closes the resources of the pipeline.
|
|
// The implementation must close all the of the pipeline's inputs.
|
|
func (r *RangeAggregationPipeline) Close() {
|
|
// Release last batch
|
|
if r.state.batch != nil {
|
|
r.state.batch.Release()
|
|
}
|
|
|
|
for _, input := range r.inputs {
|
|
input.Close()
|
|
}
|
|
}
|
|
|
|
// Inputs returns the inputs of the pipeline.
|
|
func (r *RangeAggregationPipeline) Inputs() []Pipeline {
|
|
return r.inputs
|
|
}
|
|
|
|
// Transport returns the type of transport of the implementation.
|
|
func (r *RangeAggregationPipeline) Transport() Transport {
|
|
return Local
|
|
}
|
|
|
|
type partitionAggregator struct {
|
|
digest *xxhash.Digest // used to compute key for each partition
|
|
entries map[uint64]*partitionEntry
|
|
}
|
|
|
|
func newPartitionAggregator() *partitionAggregator {
|
|
return &partitionAggregator{
|
|
digest: xxhash.New(),
|
|
// TODO: estimate size during planning
|
|
entries: make(map[uint64]*partitionEntry),
|
|
}
|
|
}
|
|
|
|
type partitionEntry struct {
|
|
count int64
|
|
labelValues []string
|
|
}
|
|
|
|
func (a *partitionAggregator) Add(partitionLabelValues []string) {
|
|
a.digest.Reset()
|
|
|
|
for i, val := range partitionLabelValues {
|
|
if i > 0 {
|
|
_, _ = a.digest.Write([]byte{0}) // separator for label values
|
|
}
|
|
|
|
_, _ = a.digest.WriteString(val)
|
|
}
|
|
|
|
key := a.digest.Sum64()
|
|
if entry, ok := a.entries[key]; ok {
|
|
// TODO: handle hash collisions
|
|
entry.count++
|
|
} else {
|
|
// create a new slice since partitionLabelValues is reused by the calling code
|
|
labelValues := make([]string, len(partitionLabelValues))
|
|
for i, v := range partitionLabelValues {
|
|
// copy the value as this is backed by the arrow array data buffer.
|
|
// We could retain the record to avoid this copy, but that would hold
|
|
// all other columns in memory for as long as the query is evaluated.
|
|
labelValues[i] = strings.Clone(v)
|
|
}
|
|
|
|
// TODO: add limits on number of partitions
|
|
a.entries[key] = &partitionEntry{
|
|
labelValues: labelValues,
|
|
count: 1,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *partitionAggregator) Reset() {
|
|
a.digest.Reset()
|
|
clear(a.entries)
|
|
}
|
|
|
|
func (a *partitionAggregator) NumOfPartitions() int {
|
|
return len(a.entries)
|
|
}
|
|
|