chore(engine): adds AST conversion and result builders for metric queries (#18166)

pull/18294/head^2
Ashwanth 6 months ago committed by GitHub
parent b1fc685397
commit 646ebe81d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 195
      pkg/engine/compat.go
  2. 99
      pkg/engine/compat_test.go
  3. 117
      pkg/engine/engine.go
  4. 8
      pkg/engine/executor/range_aggregation.go
  5. 1
      pkg/engine/executor/range_aggregation_test.go
  6. 4
      pkg/engine/internal/datatype/literal.go
  7. 9
      pkg/engine/internal/util/time.go
  8. 2
      pkg/engine/planner/logical/builder.go
  9. 10
      pkg/engine/planner/logical/format_tree.go
  10. 4
      pkg/engine/planner/logical/format_tree_test.go
  11. 8
      pkg/engine/planner/logical/node_range_aggregate.go
  12. 175
      pkg/engine/planner/logical/planner.go
  13. 68
      pkg/engine/planner/logical/planner_test.go
  14. 31
      pkg/engine/planner/physical/catalog.go
  15. 6
      pkg/engine/planner/physical/catalog_test.go
  16. 114
      pkg/engine/planner/physical/planner.go
  17. 18
      pkg/engine/planner/physical/planner_test.go
  18. 5
      pkg/engine/planner/physical/printer.go
  19. 2
      pkg/engine/planner/physical/range_aggregate.go

@ -1,8 +1,15 @@
package engine
import (
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
@ -10,46 +17,198 @@ import (
"github.com/grafana/loki/pkg/push"
)
func newResultBuilder() *resultBuilder {
return &resultBuilder{
type ResultBuilder interface {
CollectRecord(arrow.Record)
Build() logqlmodel.Result
Len() int
SetStats(stats.Result)
}
var _ ResultBuilder = &streamsResultBuilder{}
var _ ResultBuilder = &vectorResultBuilder{}
func newStreamsResultBuilder() *streamsResultBuilder {
return &streamsResultBuilder{
streams: make(map[string]int),
}
}
type resultBuilder struct {
type streamsResultBuilder struct {
streams map[string]int
data logqlmodel.Streams
stats stats.Result
count int
}
func (b *resultBuilder) add(lbs labels.Labels, entry logproto.Entry) {
key := lbs.String()
idx, ok := b.streams[key]
if !ok {
idx = len(b.data)
b.streams[key] = idx
b.data = append(b.data, push.Stream{Labels: key})
func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
for row := range int(rec.NumRows()) {
stream, entry := b.collectRow(rec, row)
// Ignore rows that don't have stream labels, log line, or timestamp
if stream.Len() == 0 || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) {
continue
}
// Add the entry to the result builder
key := stream.String()
idx, ok := b.streams[key]
if !ok {
idx = len(b.data)
b.streams[key] = idx
b.data = append(b.data, push.Stream{Labels: key})
}
b.data[idx].Entries = append(b.data[idx].Entries, entry)
b.count++
}
b.data[idx].Entries = append(b.data[idx].Entries, entry)
b.count++
}
func (b *resultBuilder) setStats(s stats.Result) {
func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) {
var entry logproto.Entry
lbs := labels.NewBuilder(labels.EmptyLabels())
metadata := labels.NewBuilder(labels.EmptyLabels())
for colIdx := range int(rec.NumCols()) {
col := rec.Column(colIdx)
colName := rec.ColumnName(colIdx)
// TODO(chaudum): We need to add metadata to columns to identify builtins, labels, metadata, and parsed.
field := rec.Schema().Field(colIdx)
colType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
// Ignore column values that are NULL or invalid or don't have a column typ
if col.IsNull(i) || !col.IsValid(i) || !ok {
continue
}
// Extract line
if colName == types.ColumnNameBuiltinMessage && colType == types.ColumnTypeBuiltin.String() {
entry.Line = col.(*array.String).Value(i)
continue
}
// Extract timestamp
if colName == types.ColumnNameBuiltinTimestamp && colType == types.ColumnTypeBuiltin.String() {
entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i)))
continue
}
// Extract label
if colType == types.ColumnTypeLabel.String() {
switch arr := col.(type) {
case *array.String:
lbs.Set(colName, arr.Value(i))
}
continue
}
// Extract metadata
if colType == types.ColumnTypeMetadata.String() {
switch arr := col.(type) {
case *array.String:
metadata.Set(colName, arr.Value(i))
}
continue
}
}
entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels())
return lbs.Labels(), entry
}
func (b *streamsResultBuilder) SetStats(s stats.Result) {
b.stats = s
}
func (b *resultBuilder) build() logqlmodel.Result {
func (b *streamsResultBuilder) Build() logqlmodel.Result {
return logqlmodel.Result{
Data: b.data,
Statistics: b.stats,
}
}
func (b *resultBuilder) empty() logqlmodel.Result {
return logqlmodel.Result{}
func (b *streamsResultBuilder) Len() int {
return b.count
}
func (b *resultBuilder) len() int {
return b.count
type vectorResultBuilder struct {
data promql.Vector
lblsBuilder *labels.Builder
stats stats.Result
}
func newVectorResultBuilder() *vectorResultBuilder {
return &vectorResultBuilder{
data: promql.Vector{},
lblsBuilder: labels.NewBuilder(labels.EmptyLabels()),
}
}
func (b *vectorResultBuilder) CollectRecord(rec arrow.Record) {
for row := range int(rec.NumRows()) {
sample, ok := b.collectRow(rec, row)
if !ok {
continue
}
b.data = append(b.data, sample)
}
}
func (b *vectorResultBuilder) collectRow(rec arrow.Record, i int) (promql.Sample, bool) {
var sample promql.Sample
b.lblsBuilder.Reset(labels.EmptyLabels())
// TODO: we add a lot of overhead by reading row by row. Switch to vectorized conversion.
for colIdx := range int(rec.NumCols()) {
col := rec.Column(colIdx)
colName := rec.ColumnName(colIdx)
field := rec.Schema().Field(colIdx)
colDataType, ok := field.Metadata.GetValue(types.MetadataKeyColumnDataType)
if !ok {
return promql.Sample{}, false
}
switch colName {
case types.ColumnNameBuiltinTimestamp:
if col.IsNull(i) {
return promql.Sample{}, false
}
sample.T = int64(col.(*array.Timestamp).Value(i))
case types.ColumnNameGeneratedValue:
if col.IsNull(i) {
return promql.Sample{}, false
}
col, ok := col.(*array.Int64)
if !ok {
return promql.Sample{}, false
}
sample.F = float64(col.Value(i))
default:
// allow any string columns
if colDataType == datatype.String.String() {
b.lblsBuilder.Set(colName, col.(*array.String).Value(i))
}
}
}
sample.Metric = b.lblsBuilder.Labels()
return sample, true
}
func (b *vectorResultBuilder) Build() logqlmodel.Result {
return logqlmodel.Result{
Data: b.data,
Statistics: b.stats,
}
}
func (b *vectorResultBuilder) SetStats(s stats.Result) {
b.stats = s
}
func (b *vectorResultBuilder) Len() int {
return len(b.data)
}

@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/pkg/push"
"github.com/prometheus/prometheus/promql"
)
func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arrow.Record {
@ -54,7 +55,7 @@ func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arro
return builder.NewRecord()
}
func TestConvertArrowRecordsToLokiResult(t *testing.T) {
func TestStreamsResultBuilder(t *testing.T) {
mdTypeLabel := datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)
mdTypeMetadata := datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String)
@ -80,11 +81,11 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) {
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newResultBuilder()
builder := newStreamsResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.len(), "expected no entries to be collected")
require.Equal(t, 0, builder.Len(), "expected no entries to be collected")
})
t.Run("fields without metadata are ignored", func(t *testing.T) {
@ -109,11 +110,11 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) {
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newResultBuilder()
builder := newStreamsResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.len(), "expected no entries to be collected")
require.Equal(t, 0, builder.Len(), "expected no entries to be collected")
})
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
@ -142,13 +143,13 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) {
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newResultBuilder()
builder := newStreamsResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 5, builder.len())
require.Equal(t, 5, builder.Len())
result := builder.build()
result := builder.Build()
require.Equal(t, 3, result.Data.(logqlmodel.Streams).Len())
expected := logqlmodel.Streams{
@ -176,3 +177,85 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) {
require.Equal(t, expected, result.Data.(logqlmodel.Streams))
})
}
func TestVectorResultBuilder(t *testing.T) {
mdTypeString := datatype.ColumnMetadata(types.ColumnTypeAmbiguous, datatype.String)
t.Run("successful conversion of vector data", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Integer)},
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
{Name: "job", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
},
nil,
)
data := [][]any{
{arrow.Timestamp(1620000000000000000), int64(42), "localhost:9090", "prometheus"},
{arrow.Timestamp(1620000000000000000), int64(23), "localhost:9100", "node-exporter"},
{arrow.Timestamp(1620000000000000000), int64(15), "localhost:9100", "prometheus"},
}
record := createRecord(t, schema, data)
defer record.Release()
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newVectorResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 3, builder.Len())
result := builder.Build()
vector := result.Data.(promql.Vector)
require.Equal(t, 3, len(vector))
// Check first sample
require.Equal(t, int64(1620000000000000000), vector[0].T)
require.Equal(t, 42.0, vector[0].F)
require.Equal(t, labels.FromStrings("instance", "localhost:9090", "job", "prometheus"), vector[0].Metric)
// Check second sample
require.Equal(t, int64(1620000000000000000), vector[1].T)
require.Equal(t, 23.0, vector[1].F)
require.Equal(t, labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"), vector[1].Metric)
// Check third sample
require.Equal(t, int64(1620000000000000000), vector[2].T)
require.Equal(t, 15.0, vector[2].F)
require.Equal(t, labels.FromStrings("instance", "localhost:9100", "job", "prometheus"), vector[2].Metric)
})
// TODO:(ashwanth) also enforce grouping labels are all present?
t.Run("rows without timestamp or value are ignored", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Integer)},
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
},
nil,
)
data := [][]interface{}{
{nil, int64(42), "localhost:9090"},
{arrow.Timestamp(1620000000000000000), nil, "localhost:9100"},
}
record := createRecord(t, schema, data)
defer record.Release()
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newVectorResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.Len(), "expected no samples to be collected")
})
}

@ -3,25 +3,22 @@ package engine
import (
"context"
"encoding/base64"
"fmt"
"strings"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/engine/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/logical"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
utillog "github.com/grafana/loki/v3/pkg/util/log"
@ -75,8 +72,6 @@ func (e *QueryEngine) Query(params logql.Params) logql.Query {
func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error) {
start := time.Now()
builder := newResultBuilder()
logger := utillog.WithContext(ctx, e.logger)
logger = log.With(logger, "query", params.QueryString(), "shard", strings.Join(params.Shards(), ","), "engine", "v2")
@ -85,7 +80,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
if err != nil {
level.Warn(logger).Log("msg", "failed to create logical plan", "err", err)
e.metrics.subqueries.WithLabelValues(statusNotImplemented).Inc()
return builder.empty(), ErrNotSupported
return logqlmodel.Result{}, ErrNotSupported
}
e.metrics.logicalPlanning.Observe(time.Since(t).Seconds())
durLogicalPlanning := time.Since(t)
@ -98,19 +93,19 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
t = time.Now() // start stopwatch for physical planning
statsCtx, ctx := stats.NewContext(ctx)
executionContext := physical.NewContext(ctx, e.metastore, params.Start(), params.End())
planner := physical.NewPlanner(executionContext)
catalog := physical.NewMetastoreCatalog(ctx, e.metastore)
planner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), catalog)
plan, err := planner.Build(logicalPlan)
if err != nil {
level.Warn(logger).Log("msg", "failed to create physical plan", "err", err)
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
return builder.empty(), ErrNotSupported
return logqlmodel.Result{}, ErrNotSupported
}
plan, err = planner.Optimize(plan)
if err != nil {
level.Warn(logger).Log("msg", "failed to optimize physical plan", "err", err)
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
return builder.empty(), ErrNotSupported
return logqlmodel.Result{}, ErrNotSupported
}
e.metrics.physicalPlanning.Observe(time.Since(t).Seconds())
durPhysicalPlanning := time.Since(t)
@ -133,12 +128,24 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
pipeline := executor.Run(ctx, cfg, plan)
defer pipeline.Close()
var builder ResultBuilder
switch params.GetExpression().(type) {
case syntax.LogSelectorExpr:
builder = newStreamsResultBuilder()
case syntax.SampleExpr:
// assume instant query since logical planning would fail for range queries.
builder = newVectorResultBuilder()
default:
// should never happen as we already check the expression type in the logical planner
panic(fmt.Sprintf("failed to execute. Invalid exprression type (%T)", params.GetExpression()))
}
if err := collectResult(ctx, pipeline, builder); err != nil {
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
return builder.empty(), err
return logqlmodel.Result{}, err
}
builder.setStats(statsCtx.Result(time.Since(start), 0, builder.len()))
builder.SetStats(statsCtx.Result(time.Since(start), 0, builder.Len()))
e.metrics.subqueries.WithLabelValues(statusSuccess).Inc()
e.metrics.execution.Observe(time.Since(t).Seconds())
@ -151,10 +158,10 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
"duration_execution", durExecution,
)
return builder.build(), nil
return builder.Build(), nil
}
func collectResult(_ context.Context, pipeline executor.Pipeline, result *resultBuilder) error {
func collectResult(_ context.Context, pipeline executor.Pipeline, builder ResultBuilder) error {
for {
if err := pipeline.Read(); err != nil {
if errors.Is(err, executor.EOF) {
@ -162,86 +169,18 @@ func collectResult(_ context.Context, pipeline executor.Pipeline, result *result
}
return err
}
if err := collectRecord(pipeline, result); err != nil {
rec, err := pipeline.Value()
if err != nil {
return err
}
}
return nil
}
func collectRecord(pipeline executor.Pipeline, result *resultBuilder) error {
rec, err := pipeline.Value()
if err != nil {
return err
}
defer rec.Release()
for rowIdx := range int(rec.NumRows()) {
collectRow(rec, rowIdx, result)
builder.CollectRecord(rec)
rec.Release()
}
return nil
}
func collectRow(rec arrow.Record, i int, result *resultBuilder) {
var entry logproto.Entry
lbs := labels.NewBuilder(labels.EmptyLabels())
metadata := labels.NewBuilder(labels.EmptyLabels())
for colIdx := range int(rec.NumCols()) {
col := rec.Column(colIdx)
colName := rec.ColumnName(colIdx)
// TODO(chaudum): We need to add metadata to columns to identify builtins, labels, metadata, and parsed.
field := rec.Schema().Field(colIdx)
colType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
// Ignore column values that are NULL or invalid or don't have a column typ
if col.IsNull(i) || !col.IsValid(i) || !ok {
continue
}
// Extract line
if colName == types.ColumnNameBuiltinMessage && colType == types.ColumnTypeBuiltin.String() {
entry.Line = col.(*array.String).Value(i)
continue
}
// Extract timestamp
if colName == types.ColumnNameBuiltinTimestamp && colType == types.ColumnTypeBuiltin.String() {
entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i)))
continue
}
// Extract label
if colType == types.ColumnTypeLabel.String() {
switch arr := col.(type) {
case *array.String:
lbs.Set(colName, arr.Value(i))
}
continue
}
// Extract metadata
if colType == types.ColumnTypeMetadata.String() {
switch arr := col.(type) {
case *array.String:
metadata.Set(colName, arr.Value(i))
}
continue
}
}
entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels())
stream := lbs.Labels()
// Ignore rows that don't have stream labels, log line, or timestamp
if stream.Len() == 0 || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) {
return
}
// Finally, add newly created entry to builder
result.add(stream, entry)
}
var _ logql.Engine = (*QueryEngine)(nil)
// queryAdapter dispatches query execution to the wrapped engine.

@ -20,10 +20,10 @@ type rangeAggregationOptions struct {
partitionBy []physical.ColumnExpression
// start and end timestamps are equal for instant queries.
startTs time.Time // start timestamp of the query
endTs time.Time // end timestamp of the query
rangeInterval time.Duration // range interval
step *time.Duration // step used for range queries, nil for instant queries
startTs time.Time // start timestamp of the query
endTs time.Time // end timestamp of the query
rangeInterval time.Duration // range interval
step time.Duration // step used for range queries
}
// RangeAggregationPipeline is a pipeline that performs aggregations over a time window.

@ -81,7 +81,6 @@ func TestRangeAggregationPipeline(t *testing.T) {
startTs: now,
endTs: now,
rangeInterval: 10 * time.Minute,
step: nil, // instant query
}
pipeline, err := NewRangeAggregationPipeline([]Pipeline{input1, input2}, expressionEvaluator{}, opts)

@ -4,6 +4,8 @@ import (
"fmt"
"strconv"
"time"
"github.com/grafana/loki/v3/pkg/engine/internal/util"
)
type NullLiteral struct {
@ -126,7 +128,7 @@ type TimestampLiteral struct {
// String implements Literal.
func (t *TimestampLiteral) String() string {
return time.Unix(0, t.v).UTC().Format(time.RFC3339Nano)
return util.FormatTimeRFC3339Nano(time.Unix(0, t.v))
}
// Type implements Literal.

@ -0,0 +1,9 @@
package util
import "time"
// FormatTimeRFC3339Nano formats the given time in RFC3339Nano format in UTC.
// Use this everywhere in the engine for consistent timestamp formatting.
func FormatTimeRFC3339Nano(t time.Time) string {
return t.UTC().Format(time.RFC3339Nano)
}

@ -57,7 +57,7 @@ func (b *Builder) RangeAggregation(
partitionBy []ColumnRef,
operation types.RangeAggregationType,
startTS, endTS time.Time,
step *time.Duration,
step time.Duration,
rangeInterval time.Duration,
) *Builder {
return &Builder{

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"github.com/grafana/loki/v3/pkg/engine/internal/util"
"github.com/grafana/loki/v3/pkg/engine/planner/internal/tree"
)
@ -135,15 +136,12 @@ func (t *treeFormatter) convertRangeAggregation(r *RangeAggregation) *tree.Node
properties := []tree.Property{
tree.NewProperty("table", false, r.Table.Name()),
tree.NewProperty("operation", false, r.Operation),
tree.NewProperty("start_ts", false, r.Start),
tree.NewProperty("end_ts", false, r.End),
tree.NewProperty("start_ts", false, util.FormatTimeRFC3339Nano(r.Start)),
tree.NewProperty("end_ts", false, util.FormatTimeRFC3339Nano(r.End)),
tree.NewProperty("step", false, r.Step),
tree.NewProperty("range", false, r.RangeInterval),
}
if r.Step != nil {
properties = append(properties, tree.NewProperty("step", false, r.Step))
}
if len(r.PartitionBy) > 0 {
partitionBy := make([]any, len(r.PartitionBy))
for i := range r.PartitionBy {

@ -128,7 +128,7 @@ func TestFormatRangeAggregationQuery(t *testing.T) {
types.RangeAggregationTypeCount,
time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), // Start Time
time.Date(1970, 1, 1, 1, 0, 0, 0, time.UTC), // End Time
nil,
time.Minute,
time.Minute*5, // Range
)
@ -143,7 +143,7 @@ func TestFormatRangeAggregationQuery(t *testing.T) {
t.Logf("Actual output:\n%s", actual)
expected := `
RangeAggregation <%5> table=%4 operation=count start_ts=1970-01-01 00:00:00 +0000 UTC end_ts=1970-01-01 01:00:00 +0000 UTC range=5m0s partition_by=(ambiguous.label1, ambiguous.label2)
RangeAggregation <%5> table=%4 operation=count start_ts=1970-01-01T00:00:00Z end_ts=1970-01-01T01:00:00Z step=1m0s range=5m0s partition_by=(ambiguous.label1, ambiguous.label2)
ColumnRef column=label1 type=ambiguous
ColumnRef column=label2 type=ambiguous
SELECT <%4> table=%2 predicate=%3

@ -5,6 +5,7 @@ import (
"time"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/internal/util"
"github.com/grafana/loki/v3/pkg/engine/planner/schema"
)
@ -22,7 +23,7 @@ type RangeAggregation struct {
Operation types.RangeAggregationType // The type of aggregation operation to perform.
Start time.Time
End time.Time
Step *time.Duration // optional for instant queries
Step time.Duration
RangeInterval time.Duration
}
@ -41,10 +42,7 @@ func (r *RangeAggregation) Name() string {
// String returns the disassembled SSA form of the RangeAggregation instruction.
func (r *RangeAggregation) String() string {
props := fmt.Sprintf("operation=%s, start_ts=%s, end_ts=%s, range=%s", r.Operation, r.Start, r.End, r.RangeInterval)
if r.Step != nil {
props += fmt.Sprintf(", step=%s", r.Step)
}
props := fmt.Sprintf("operation=%s, start_ts=%s, end_ts=%s, step=%s, range=%s", r.Operation, util.FormatTimeRFC3339Nano(r.Start), util.FormatTimeRFC3339Nano(r.End), r.Step, r.RangeInterval)
if len(r.PartitionBy) > 0 {
partitionBy := ""

@ -18,50 +18,74 @@ var errUnimplemented = errors.New("query contains unimplemented features")
// BuildPlan converts a LogQL query represented as [logql.Params] into a logical [Plan].
// It may return an error as second argument in case the traversal of the AST of the query fails.
func BuildPlan(query logql.Params) (*Plan, error) {
var selector Value
var predicates []Value
func BuildPlan(params logql.Params) (*Plan, error) {
var (
builder *Builder
err error
)
// TODO(chaudum): Implement a Walk function that can return an error
var err error
switch e := params.GetExpression().(type) {
case syntax.LogSelectorExpr:
builder, err = buildPlanForLogQuery(e, params, false, 0)
case syntax.SampleExpr:
builder, err = buildPlanForSampleQuery(e, params)
default:
err = fmt.Errorf("unexpected expression type (%T)", e)
}
if err != nil {
return nil, fmt.Errorf("failed to convert AST into logical plan: %w", err)
}
expr := query.GetExpression()
return builder.ToPlan()
}
// buildPlanForLogQuery builds logical plan operations by traversing [syntax.LogSelectorExpr]
// isMetricQuery should be set to true if this expr is encountered when processing a [syntax.SampleExpr].
// rangeInterval should be set to a non-zero value if the query contains [$range].
func buildPlanForLogQuery(expr syntax.LogSelectorExpr, params logql.Params, isMetricQuery bool, rangeInterval time.Duration) (*Builder, error) {
var (
err error
selector Value
predicates []Value
)
// TODO(chaudum): Implement a Walk function that can return an error
expr.Walk(func(e syntax.Expr) bool {
switch e := e.(type) {
case syntax.SampleExpr:
err = errUnimplemented
return false // do not traverse children
case *syntax.LineParserExpr, *syntax.LogfmtParserExpr, *syntax.LogfmtExpressionParserExpr, *syntax.JSONExpressionParserExpr:
err = errUnimplemented
return false // do not traverse children
case *syntax.LineFmtExpr, *syntax.LabelFmtExpr:
err = errUnimplemented
return false // do not traverse children
case *syntax.KeepLabelsExpr, *syntax.DropLabelsExpr:
err = errUnimplemented
return false // do not traverse children
case *syntax.PipelineExpr:
// [PipelineExpr] is a container for other expressions, nothing to do here.
return true
case *syntax.MatchersExpr:
selector = convertLabelMatchers(e.Matchers())
return true
case *syntax.LineFilterExpr:
predicates = append(predicates, convertLineFilterExpr(e))
// We do not want to traverse the AST further down, because line filter expressions can be nested,
// which would lead to multiple predicates of the same expression.
return false
return false // do not traverse children
case *syntax.LabelFilterExpr:
if val, innerErr := convertLabelFilter(e.LabelFilterer); innerErr != nil {
err = innerErr
} else {
predicates = append(predicates, val)
}
return true
case *syntax.LineParserExpr, *syntax.LogfmtParserExpr, *syntax.LogfmtExpressionParserExpr, *syntax.JSONExpressionParserExpr,
*syntax.LineFmtExpr, *syntax.LabelFmtExpr,
*syntax.KeepLabelsExpr, *syntax.DropLabelsExpr:
err = errUnimplemented
return false // do not traverse children
default:
err = errUnimplemented
return false // do not traverse children
}
return true
})
if err != nil {
return nil, fmt.Errorf("failed to convert AST into logical plan: %w", err)
return nil, err
}
shard, err := parseShards(query.Shards())
shard, err := parseShards(params.Shards())
if err != nil {
return nil, fmt.Errorf("failed to parse shard: %w", err)
}
@ -74,15 +98,19 @@ func BuildPlan(query logql.Params) (*Plan, error) {
},
)
// SORT -> SortMerge
direction := query.Direction()
ascending := direction == logproto.FORWARD
builder = builder.Sort(*timestampColumnRef(), ascending, false)
// Metric queries currently do not expect the logs to be sorted by timestamp.
if !isMetricQuery {
// SORT -> SortMerge
direction := params.Direction()
ascending := direction == logproto.FORWARD
builder = builder.Sort(*timestampColumnRef(), ascending, false)
}
// SELECT -> Filter
start := query.Start()
end := query.End()
for _, value := range convertQueryRangeToPredicates(start, end) {
start := params.Start()
end := params.End()
// extend search by rangeInterval to be able to include entries belonging to the [$range] interval.
for _, value := range convertQueryRangeToPredicates(start.Add(-rangeInterval), end) {
builder = builder.Select(value)
}
@ -90,12 +118,89 @@ func BuildPlan(query logql.Params) (*Plan, error) {
builder = builder.Select(value)
}
// LIMIT -> Limit
limit := query.Limit()
builder = builder.Limit(0, limit)
// Metric queries do not apply a limit.
if !isMetricQuery {
// LIMIT -> Limit
limit := params.Limit()
builder = builder.Limit(0, limit)
}
return builder, nil
}
func buildPlanForSampleQuery(e syntax.SampleExpr, params logql.Params) (*Builder, error) {
if params.Step() > 0 {
return nil, fmt.Errorf("only instant metric queries are supported: %w", errUnimplemented)
}
var (
err error
rangeAggType types.RangeAggregationType
rangeInterval time.Duration
vecAggType types.VectorAggregationType
groupBy []ColumnRef
)
e.Walk(func(e syntax.Expr) bool {
switch e := e.(type) {
case *syntax.RangeAggregationExpr:
// only count operation is supported for range aggregation.
// offsets are not yet supported.
if e.Operation != syntax.OpRangeTypeCount || e.Left.Offset != 0 {
err = errUnimplemented
return false
}
rangeAggType = types.RangeAggregationTypeCount
rangeInterval = e.Left.Interval
return false // do not traverse log range query
case *syntax.VectorAggregationExpr:
// only sum operation is supported for vector aggregation
// grouping by atleast one label is required.
if e.Operation != syntax.OpTypeSum ||
e.Grouping == nil || len(e.Grouping.Groups) == 0 || e.Grouping.Without {
err = errUnimplemented
return false
}
vecAggType = types.VectorAggregationTypeSum
groupBy = make([]ColumnRef, 0, len(e.Grouping.Groups))
for _, group := range e.Grouping.Groups {
groupBy = append(groupBy, *NewColumnRef(group, types.ColumnTypeAmbiguous))
}
return true
default:
err = errUnimplemented
return false // do not traverse children
}
})
if err != nil {
return nil, err
}
if rangeAggType == types.RangeAggregationTypeInvalid || vecAggType == types.VectorAggregationTypeInvalid {
return nil, errUnimplemented
}
logSelectorExpr, err := e.Selector()
if err != nil {
return nil, err
}
builder, err := buildPlanForLogQuery(logSelectorExpr, params, true, rangeInterval)
if err != nil {
return nil, err
}
builder = builder.RangeAggregation(
nil, rangeAggType, params.Start(), params.End(), params.Step(), rangeInterval,
).VectorAggregation(groupBy, vecAggType)
plan, err := builder.ToPlan()
return plan, err
return builder, nil
}
func convertLabelMatchers(matchers []*labels.Matcher) Value {

@ -14,10 +14,11 @@ import (
)
type query struct {
statement string
start, end int64
direction logproto.Direction
limit uint32
statement string
start, end int64
step, interval time.Duration
direction logproto.Direction
limit uint32
}
// Direction implements logql.Params.
@ -72,7 +73,7 @@ func (q *query) Shards() []string {
// Step implements logql.Params.
func (q *query) Step() time.Duration {
panic("unimplemented")
return q.step
}
var _ logql.Params = (*query)(nil)
@ -120,6 +121,41 @@ RETURN %20
t.Logf("\n%s\n", sb.String())
}
func TestConvertAST_MetricQuery_Success(t *testing.T) {
q := &query{
statement: `sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`,
start: 3600,
end: 7200,
interval: 5 * time.Minute,
}
logicalPlan, err := BuildPlan(q)
require.NoError(t, err)
t.Logf("\n%s\n", logicalPlan.String())
expected := `%1 = EQ label.cluster "prod"
%2 = MATCH_RE label.namespace "loki-.*"
%3 = AND %1 %2
%4 = MAKETABLE [selector=%3, shard=0_of_1]
%5 = GTE builtin.timestamp 1970-01-01T00:55:00Z
%6 = SELECT %4 [predicate=%5]
%7 = LT builtin.timestamp 1970-01-01T02:00:00Z
%8 = SELECT %6 [predicate=%7]
%9 = MATCH_STR builtin.message "metric.go"
%10 = SELECT %8 [predicate=%9]
%11 = RANGE_AGGREGATION %10 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%12 = VECTOR_AGGREGATION %11 [operation=sum, group_by=(ambiguous.level)]
RETURN %12
`
require.Equal(t, expected, logicalPlan.String())
var sb strings.Builder
PrintTree(&sb, logicalPlan.Value())
t.Logf("\n%s\n", sb.String())
}
func TestCanExecuteQuery(t *testing.T) {
for _, tt := range []struct {
statement string
@ -179,7 +215,27 @@ func TestCanExecuteQuery(t *testing.T) {
statement: `{env="prod"} |= "metric.go" | retry > 2`,
},
{
statement: `sum(rate({env="prod"}[1m]))`,
statement: `sum by (level) (count_over_time({env="prod"}[1m]))`,
expected: true,
},
{
// both vector and range aggregation are required
statement: `count_over_time({env="prod"}[1m])`,
},
{
// group by labels are required
statement: `sum(count_over_time({env="prod"}[1m]))`,
},
{
// rate is not supported
statement: `sum by (level) (rate({env="prod"}[1m]))`,
},
{
// max is not supported
statement: `max by (level) (count_over_time({env="prod"}[1m]))`,
},
{
statement: `sum by (level) (count_over_time({env="prod"}[1m] offset 5m))`,
},
} {
t.Run(tt.statement, func(t *testing.T) {

@ -41,35 +41,32 @@ type Catalog interface {
// ResolveDataObj returns a list of data object paths,
// a list of stream IDs for each data data object,
// and a list of sections for each data object
ResolveDataObj(Expression) ([]DataObjLocation, [][]int64, [][]int, error)
ResolveDataObjWithShard(Expression, ShardInfo) ([]DataObjLocation, [][]int64, [][]int, error)
ResolveDataObj(Expression, time.Time, time.Time) ([]DataObjLocation, [][]int64, [][]int, error)
ResolveDataObjWithShard(Expression, ShardInfo, time.Time, time.Time) ([]DataObjLocation, [][]int64, [][]int, error)
}
// Context is the default implementation of [Catalog].
type Context struct {
ctx context.Context
metastore metastore.Metastore
from, through time.Time
// MetastoreCatalog is the default implementation of [Catalog].
type MetastoreCatalog struct {
ctx context.Context
metastore metastore.Metastore
}
// NewContext creates a new instance of [Context] for query planning.
func NewContext(ctx context.Context, ms metastore.Metastore, from, through time.Time) *Context {
return &Context{
// NewMetastoreCatalog creates a new instance of [MetastoreCatalog] for query planning.
func NewMetastoreCatalog(ctx context.Context, ms metastore.Metastore) *MetastoreCatalog {
return &MetastoreCatalog{
ctx: ctx,
metastore: ms,
from: from,
through: through,
}
}
// ResolveDataObj resolves DataObj locations and streams IDs based on a given
// [Expression]. The expression is required to be a (tree of) [BinaryExpression]
// with a [ColumnExpression] on the left and a [LiteralExpression] on the right.
func (c *Context) ResolveDataObj(selector Expression) ([]DataObjLocation, [][]int64, [][]int, error) {
return c.ResolveDataObjWithShard(selector, noShard)
func (c *MetastoreCatalog) ResolveDataObj(selector Expression, from, through time.Time) ([]DataObjLocation, [][]int64, [][]int, error) {
return c.ResolveDataObjWithShard(selector, noShard, from, through)
}
func (c *Context) ResolveDataObjWithShard(selector Expression, shard ShardInfo) ([]DataObjLocation, [][]int64, [][]int, error) {
func (c *MetastoreCatalog) ResolveDataObjWithShard(selector Expression, shard ShardInfo, from, through time.Time) ([]DataObjLocation, [][]int64, [][]int, error) {
if c.metastore == nil {
return nil, nil, nil, errors.New("no metastore to resolve objects")
}
@ -79,7 +76,7 @@ func (c *Context) ResolveDataObjWithShard(selector Expression, shard ShardInfo)
return nil, nil, nil, fmt.Errorf("failed to convert selector expression into matchers: %w", err)
}
paths, streamIDs, numSections, err := c.metastore.StreamIDs(c.ctx, c.from, c.through, matchers...)
paths, streamIDs, numSections, err := c.metastore.StreamIDs(c.ctx, from, through, matchers...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to resolve data object locations: %w", err)
}
@ -187,4 +184,4 @@ func convertBinaryOp(t types.BinaryOp) (labels.MatchType, error) {
return ty, nil
}
var _ Catalog = (*Context)(nil)
var _ Catalog = (*MetastoreCatalog)(nil)

@ -10,7 +10,7 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
func TestContext_ConvertLiteral(t *testing.T) {
func TestCatalog_ConvertLiteral(t *testing.T) {
tests := []struct {
expr Expression
want string
@ -63,7 +63,7 @@ func TestContext_ConvertLiteral(t *testing.T) {
}
}
func TestContext_ConvertColumnRef(t *testing.T) {
func TestCatalog_ConvertColumnRef(t *testing.T) {
tests := []struct {
expr Expression
want string
@ -108,7 +108,7 @@ func TestContext_ConvertColumnRef(t *testing.T) {
}
}
func TestContext_ExpressionToMatchers(t *testing.T) {
func TestCatalog_ExpressionToMatchers(t *testing.T) {
tests := []struct {
expr Expression
want []*labels.Matcher

@ -3,13 +3,62 @@ package physical
import (
"errors"
"fmt"
"time"
"github.com/grafana/loki/v3/pkg/engine/planner/logical"
)
// Internal state of the planner
type state struct {
direction SortOrder
// Context carries planning state that needs to be propagated down the plan tree.
// This enables each branch to have independent context, which is essential for complex queries:
// - Binary operations with different [$range] intervals, offsets or @timestamp.
//
// Propagation:
// - Each process() method passes context to children.
// - Context can be cloned and modified by nodes (ex: RangeAggregation sets rangeInterval) that gets inherited by all descendants.
type Context struct {
from time.Time
through time.Time
rangeInterval time.Duration
direction SortOrder
}
func NewContext(from, through time.Time) *Context {
return &Context{
from: from,
through: through,
}
}
func (pc *Context) Clone() *Context {
return &Context{
from: pc.from,
through: pc.through,
rangeInterval: pc.rangeInterval,
direction: pc.direction,
}
}
func (pc *Context) WithDirection(direction SortOrder) *Context {
cloned := pc.Clone()
cloned.direction = direction
return cloned
}
func (pc *Context) WithRangeInterval(rangeInterval time.Duration) *Context {
cloned := pc.Clone()
cloned.rangeInterval = rangeInterval
return cloned
}
func (pc *Context) WithTimeRange(from, through time.Time) *Context {
cloned := pc.Clone()
cloned.from = from
cloned.through = through
return cloned
}
func (pc *Context) GetResolveTimeRange() (from, through time.Time) {
return pc.from.Add(-pc.rangeInterval), pc.through
}
// Planner creates an executable physical plan from a logical plan.
@ -21,14 +70,17 @@ type state struct {
// a) Push down the limit of the Limit node to the DataObjScan nodes.
// b) Push down the predicate from the Filter node to the DataObjScan nodes.
type Planner struct {
context *Context
catalog Catalog
plan *Plan
state state
}
// NewPlanner creates a new planner instance with the given context.
func NewPlanner(catalog Catalog) *Planner {
return &Planner{catalog: catalog}
func NewPlanner(ctx *Context, catalog Catalog) *Planner {
return &Planner{
context: ctx,
catalog: catalog,
}
}
// Build converts a given logical plan into a physical plan and returns an error if the conversion fails.
@ -38,7 +90,7 @@ func (p *Planner) Build(lp *logical.Plan) (*Plan, error) {
for _, inst := range lp.Instructions {
switch inst := inst.(type) {
case *logical.Return:
nodes, err := p.process(inst.Value)
nodes, err := p.process(inst.Value, p.context)
if err != nil {
return nil, err
}
@ -54,7 +106,6 @@ func (p *Planner) Build(lp *logical.Plan) (*Plan, error) {
// reset resets the internal state of the planner
func (p *Planner) reset() {
p.plan = &Plan{}
p.state = state{}
}
// Convert a predicate from an [logical.Instruction] into an [Expression].
@ -81,32 +132,33 @@ func (p *Planner) convertPredicate(inst logical.Value) Expression {
}
// Convert a [logical.Instruction] into one or multiple [Node]s.
func (p *Planner) process(inst logical.Value) ([]Node, error) {
func (p *Planner) process(inst logical.Value, ctx *Context) ([]Node, error) {
switch inst := inst.(type) {
case *logical.MakeTable:
return p.processMakeTable(inst)
return p.processMakeTable(inst, ctx)
case *logical.Select:
return p.processSelect(inst)
return p.processSelect(inst, ctx)
case *logical.Sort:
return p.processSort(inst)
return p.processSort(inst, ctx)
case *logical.Limit:
return p.processLimit(inst)
return p.processLimit(inst, ctx)
case *logical.RangeAggregation:
return p.processRangeAggregation(inst)
return p.processRangeAggregation(inst, ctx)
case *logical.VectorAggregation:
return p.processVectorAggregation(inst)
return p.processVectorAggregation(inst, ctx)
}
return nil, nil
}
// Convert [logical.MakeTable] into one or more [DataObjScan] nodes.
func (p *Planner) processMakeTable(lp *logical.MakeTable) ([]Node, error) {
func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) ([]Node, error) {
shard, ok := lp.Shard.(*logical.ShardInfo)
if !ok {
return nil, fmt.Errorf("invalid shard, got %T", lp.Shard)
}
objects, streams, sections, err := p.catalog.ResolveDataObjWithShard(p.convertPredicate(lp.Selector), ShardInfo(*shard))
from, through := ctx.GetResolveTimeRange()
objects, streams, sections, err := p.catalog.ResolveDataObjWithShard(p.convertPredicate(lp.Selector), ShardInfo(*shard), from, through)
if err != nil {
return nil, err
}
@ -117,7 +169,7 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable) ([]Node, error) {
Location: objects[i],
StreamIDs: streams[i],
Sections: sections[i],
Direction: p.state.direction, // apply direction from previously visited Sort node
Direction: ctx.direction, // apply direction from previously visited Sort node
}
p.plan.addNode(node)
nodes = append(nodes, node)
@ -126,12 +178,12 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable) ([]Node, error) {
}
// Convert [logical.Select] into one [Filter] node.
func (p *Planner) processSelect(lp *logical.Select) ([]Node, error) {
func (p *Planner) processSelect(lp *logical.Select, ctx *Context) ([]Node, error) {
node := &Filter{
Predicates: []Expression{p.convertPredicate(lp.Predicate)},
}
p.plan.addNode(node)
children, err := p.process(lp.Table)
children, err := p.process(lp.Table, ctx)
if err != nil {
return nil, err
}
@ -144,7 +196,7 @@ func (p *Planner) processSelect(lp *logical.Select) ([]Node, error) {
}
// Convert [logical.Sort] into one [SortMerge] node.
func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) {
func (p *Planner) processSort(lp *logical.Sort, ctx *Context) ([]Node, error) {
order := DESC
if lp.Ascending {
order = ASC
@ -153,12 +205,14 @@ func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) {
Column: &ColumnExpr{Ref: lp.Column.Ref},
Order: order,
}
p.state.direction = order
p.plan.addNode(node)
children, err := p.process(lp.Table)
children, err := p.process(lp.Table, ctx.WithDirection(order))
if err != nil {
return nil, err
}
for i := range children {
if err := p.plan.addEdge(Edge{Parent: node, Child: children[i]}); err != nil {
return nil, err
@ -168,13 +222,13 @@ func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) {
}
// Convert [logical.Limit] into one [Limit] node.
func (p *Planner) processLimit(lp *logical.Limit) ([]Node, error) {
func (p *Planner) processLimit(lp *logical.Limit, ctx *Context) ([]Node, error) {
node := &Limit{
Skip: lp.Skip,
Fetch: lp.Fetch,
}
p.plan.addNode(node)
children, err := p.process(lp.Table)
children, err := p.process(lp.Table, ctx)
if err != nil {
return nil, err
}
@ -186,7 +240,7 @@ func (p *Planner) processLimit(lp *logical.Limit) ([]Node, error) {
return []Node{node}, nil
}
func (p *Planner) processRangeAggregation(r *logical.RangeAggregation) ([]Node, error) {
func (p *Planner) processRangeAggregation(r *logical.RangeAggregation, ctx *Context) ([]Node, error) {
partitionBy := make([]ColumnExpression, len(r.PartitionBy))
for i, col := range r.PartitionBy {
partitionBy[i] = &ColumnExpr{Ref: col.Ref}
@ -201,10 +255,12 @@ func (p *Planner) processRangeAggregation(r *logical.RangeAggregation) ([]Node,
Step: r.Step,
}
p.plan.addNode(node)
children, err := p.process(r.Table)
children, err := p.process(r.Table, ctx.WithRangeInterval(r.RangeInterval))
if err != nil {
return nil, err
}
for i := range children {
if err := p.plan.addEdge(Edge{Parent: node, Child: children[i]}); err != nil {
return nil, err
@ -214,7 +270,7 @@ func (p *Planner) processRangeAggregation(r *logical.RangeAggregation) ([]Node,
}
// Convert [logical.VectorAggregation] into one [VectorAggregation] node.
func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation) ([]Node, error) {
func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation, ctx *Context) ([]Node, error) {
groupBy := make([]ColumnExpression, len(lp.GroupBy))
for i, col := range lp.GroupBy {
groupBy[i] = &ColumnExpr{Ref: col.Ref}
@ -225,7 +281,7 @@ func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation) ([]Nod
Operation: lp.Operation,
}
p.plan.addNode(node)
children, err := p.process(lp.Table)
children, err := p.process(lp.Table, ctx)
if err != nil {
return nil, err
}

@ -21,12 +21,12 @@ type catalog struct {
}
// ResolveDataObj implements Catalog.
func (c *catalog) ResolveDataObj(e Expression) ([]DataObjLocation, [][]int64, [][]int, error) {
return c.ResolveDataObjWithShard(e, noShard)
func (c *catalog) ResolveDataObj(e Expression, from, through time.Time) ([]DataObjLocation, [][]int64, [][]int, error) {
return c.ResolveDataObjWithShard(e, noShard, from, through)
}
// ResolveDataObjForShard implements Catalog.
func (c *catalog) ResolveDataObjWithShard(_ Expression, shard ShardInfo) ([]DataObjLocation, [][]int64, [][]int, error) {
func (c *catalog) ResolveDataObjWithShard(_ Expression, shard ShardInfo, _, _ time.Time) ([]DataObjLocation, [][]int64, [][]int, error) {
paths := make([]string, 0, len(c.streamsByObject))
streams := make([][]int64, 0, len(c.streamsByObject))
sections := make([]int, 0, len(c.streamsByObject))
@ -99,7 +99,7 @@ func TestMockCatalog(t *testing.T) {
},
} {
t.Run("shard "+tt.shard.String(), func(t *testing.T) {
paths, streams, sections, _ := catalog.ResolveDataObjWithShard(nil, tt.shard)
paths, streams, sections, _ := catalog.ResolveDataObjWithShard(nil, tt.shard, time.Now(), time.Now())
require.Equal(t, tt.expPaths, paths)
require.Equal(t, tt.expStreams, streams)
require.Equal(t, tt.expSections, sections)
@ -142,7 +142,7 @@ func TestPlanner_ConvertMaketable(t *testing.T) {
"obj5": {streamIDs: []int64{4, 5}, sections: 2},
},
}
planner := NewPlanner(catalog)
planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog)
streamSelector := &logical.BinOp{
Left: logical.NewColumnRef("app", types.ColumnTypeLabel),
@ -197,7 +197,7 @@ func TestPlanner_ConvertMaketable(t *testing.T) {
Shard: tt.shard,
}
planner.reset()
nodes, err := planner.processMakeTable(relation)
nodes, err := planner.processMakeTable(relation, NewContext(time.Now(), time.Now()))
require.NoError(t, err)
require.Equal(t, tt.expPaths, locations(t, nodes))
@ -245,7 +245,7 @@ func TestPlanner_Convert(t *testing.T) {
"obj2": {streamIDs: []int64{3, 4}, sections: 1},
},
}
planner := NewPlanner(catalog)
planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog)
physicalPlan, err := planner.Build(logicalPlan)
require.NoError(t, err)
@ -284,7 +284,7 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) {
types.RangeAggregationTypeCount,
time.Date(2023, 10, 1, 0, 0, 0, 0, time.UTC), // Start Time
time.Date(2023, 10, 1, 1, 0, 0, 0, time.UTC), // End Time
nil, // Step
0, // Step
time.Minute*5, // Range
)
@ -297,7 +297,7 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) {
"obj2": {streamIDs: []int64{3, 4}, sections: 1},
},
}
planner := NewPlanner(catalog)
planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog)
physicalPlan, err := planner.Build(logicalPlan)
require.NoError(t, err)

@ -62,13 +62,10 @@ func toTreeNode(n Node) *tree.Node {
tree.NewProperty("operation", false, node.Operation),
tree.NewProperty("start", false, node.Start),
tree.NewProperty("end", false, node.End),
tree.NewProperty("step", false, node.Step),
tree.NewProperty("range", false, node.Range),
}
if node.Step != nil {
properties = append(properties, tree.NewProperty("step", false, node.Step))
}
if len(node.PartitionBy) > 0 {
properties = append(properties, tree.NewProperty("partition_by", true, toAnySlice(node.PartitionBy)...))
}

@ -16,7 +16,7 @@ type RangeAggregation struct {
Operation types.RangeAggregationType
Start time.Time
End time.Time
Step *time.Duration // optional for instant queries
Step time.Duration // optional for instant queries
Range time.Duration
}

Loading…
Cancel
Save