diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 0769142ea2..a9ff76a83b 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -1,8 +1,15 @@ package engine import ( + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" + "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -10,46 +17,198 @@ import ( "github.com/grafana/loki/pkg/push" ) -func newResultBuilder() *resultBuilder { - return &resultBuilder{ +type ResultBuilder interface { + CollectRecord(arrow.Record) + Build() logqlmodel.Result + Len() int + SetStats(stats.Result) +} + +var _ ResultBuilder = &streamsResultBuilder{} +var _ ResultBuilder = &vectorResultBuilder{} + +func newStreamsResultBuilder() *streamsResultBuilder { + return &streamsResultBuilder{ streams: make(map[string]int), } } -type resultBuilder struct { +type streamsResultBuilder struct { streams map[string]int data logqlmodel.Streams stats stats.Result count int } -func (b *resultBuilder) add(lbs labels.Labels, entry logproto.Entry) { - key := lbs.String() - idx, ok := b.streams[key] - if !ok { - idx = len(b.data) - b.streams[key] = idx - b.data = append(b.data, push.Stream{Labels: key}) +func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { + for row := range int(rec.NumRows()) { + stream, entry := b.collectRow(rec, row) + + // Ignore rows that don't have stream labels, log line, or timestamp + if stream.Len() == 0 || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) { + continue + } + + // Add the entry to the result builder + key := stream.String() + idx, ok := b.streams[key] + if !ok { + idx = len(b.data) + b.streams[key] = idx + b.data = append(b.data, push.Stream{Labels: key}) + } + b.data[idx].Entries = append(b.data[idx].Entries, entry) + b.count++ } - b.data[idx].Entries = append(b.data[idx].Entries, entry) - b.count++ } -func (b *resultBuilder) setStats(s stats.Result) { +func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) { + var entry logproto.Entry + lbs := labels.NewBuilder(labels.EmptyLabels()) + metadata := labels.NewBuilder(labels.EmptyLabels()) + + for colIdx := range int(rec.NumCols()) { + col := rec.Column(colIdx) + colName := rec.ColumnName(colIdx) + + // TODO(chaudum): We need to add metadata to columns to identify builtins, labels, metadata, and parsed. + field := rec.Schema().Field(colIdx) + colType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType) + + // Ignore column values that are NULL or invalid or don't have a column typ + if col.IsNull(i) || !col.IsValid(i) || !ok { + continue + } + + // Extract line + if colName == types.ColumnNameBuiltinMessage && colType == types.ColumnTypeBuiltin.String() { + entry.Line = col.(*array.String).Value(i) + continue + } + + // Extract timestamp + if colName == types.ColumnNameBuiltinTimestamp && colType == types.ColumnTypeBuiltin.String() { + entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i))) + continue + } + + // Extract label + if colType == types.ColumnTypeLabel.String() { + switch arr := col.(type) { + case *array.String: + lbs.Set(colName, arr.Value(i)) + } + continue + } + + // Extract metadata + if colType == types.ColumnTypeMetadata.String() { + switch arr := col.(type) { + case *array.String: + metadata.Set(colName, arr.Value(i)) + } + continue + } + } + entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels()) + + return lbs.Labels(), entry +} + +func (b *streamsResultBuilder) SetStats(s stats.Result) { b.stats = s } -func (b *resultBuilder) build() logqlmodel.Result { +func (b *streamsResultBuilder) Build() logqlmodel.Result { return logqlmodel.Result{ Data: b.data, Statistics: b.stats, } } -func (b *resultBuilder) empty() logqlmodel.Result { - return logqlmodel.Result{} +func (b *streamsResultBuilder) Len() int { + return b.count } -func (b *resultBuilder) len() int { - return b.count +type vectorResultBuilder struct { + data promql.Vector + lblsBuilder *labels.Builder + stats stats.Result +} + +func newVectorResultBuilder() *vectorResultBuilder { + return &vectorResultBuilder{ + data: promql.Vector{}, + lblsBuilder: labels.NewBuilder(labels.EmptyLabels()), + } +} + +func (b *vectorResultBuilder) CollectRecord(rec arrow.Record) { + for row := range int(rec.NumRows()) { + sample, ok := b.collectRow(rec, row) + if !ok { + continue + } + + b.data = append(b.data, sample) + } +} + +func (b *vectorResultBuilder) collectRow(rec arrow.Record, i int) (promql.Sample, bool) { + var sample promql.Sample + b.lblsBuilder.Reset(labels.EmptyLabels()) + + // TODO: we add a lot of overhead by reading row by row. Switch to vectorized conversion. + for colIdx := range int(rec.NumCols()) { + col := rec.Column(colIdx) + colName := rec.ColumnName(colIdx) + + field := rec.Schema().Field(colIdx) + colDataType, ok := field.Metadata.GetValue(types.MetadataKeyColumnDataType) + if !ok { + return promql.Sample{}, false + } + + switch colName { + case types.ColumnNameBuiltinTimestamp: + if col.IsNull(i) { + return promql.Sample{}, false + } + + sample.T = int64(col.(*array.Timestamp).Value(i)) + case types.ColumnNameGeneratedValue: + if col.IsNull(i) { + return promql.Sample{}, false + } + + col, ok := col.(*array.Int64) + if !ok { + return promql.Sample{}, false + } + sample.F = float64(col.Value(i)) + default: + // allow any string columns + if colDataType == datatype.String.String() { + b.lblsBuilder.Set(colName, col.(*array.String).Value(i)) + } + } + } + + sample.Metric = b.lblsBuilder.Labels() + return sample, true +} + +func (b *vectorResultBuilder) Build() logqlmodel.Result { + return logqlmodel.Result{ + Data: b.data, + Statistics: b.stats, + } +} + +func (b *vectorResultBuilder) SetStats(s stats.Result) { + b.stats = s +} + +func (b *vectorResultBuilder) Len() int { + return len(b.data) } diff --git a/pkg/engine/engine_test.go b/pkg/engine/compat_test.go similarity index 64% rename from pkg/engine/engine_test.go rename to pkg/engine/compat_test.go index d2abb7273c..50f4ebe85a 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/compat_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/pkg/push" + "github.com/prometheus/prometheus/promql" ) func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arrow.Record { @@ -54,7 +55,7 @@ func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arro return builder.NewRecord() } -func TestConvertArrowRecordsToLokiResult(t *testing.T) { +func TestStreamsResultBuilder(t *testing.T) { mdTypeLabel := datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String) mdTypeMetadata := datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String) @@ -80,11 +81,11 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) { pipeline := executor.NewBufferedPipeline(record) defer pipeline.Close() - builder := newResultBuilder() + builder := newStreamsResultBuilder() err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) - require.Equal(t, 0, builder.len(), "expected no entries to be collected") + require.Equal(t, 0, builder.Len(), "expected no entries to be collected") }) t.Run("fields without metadata are ignored", func(t *testing.T) { @@ -109,11 +110,11 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) { pipeline := executor.NewBufferedPipeline(record) defer pipeline.Close() - builder := newResultBuilder() + builder := newStreamsResultBuilder() err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) - require.Equal(t, 0, builder.len(), "expected no entries to be collected") + require.Equal(t, 0, builder.Len(), "expected no entries to be collected") }) t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) { @@ -142,13 +143,13 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) { pipeline := executor.NewBufferedPipeline(record) defer pipeline.Close() - builder := newResultBuilder() + builder := newStreamsResultBuilder() err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) - require.Equal(t, 5, builder.len()) + require.Equal(t, 5, builder.Len()) - result := builder.build() + result := builder.Build() require.Equal(t, 3, result.Data.(logqlmodel.Streams).Len()) expected := logqlmodel.Streams{ @@ -176,3 +177,85 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) { require.Equal(t, expected, result.Data.(logqlmodel.Streams)) }) } + +func TestVectorResultBuilder(t *testing.T) { + mdTypeString := datatype.ColumnMetadata(types.ColumnTypeAmbiguous, datatype.String) + + t.Run("successful conversion of vector data", func(t *testing.T) { + schema := arrow.NewSchema( + []arrow.Field{ + {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, + {Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Integer)}, + {Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString}, + {Name: "job", Type: arrow.BinaryTypes.String, Metadata: mdTypeString}, + }, + nil, + ) + + data := [][]any{ + {arrow.Timestamp(1620000000000000000), int64(42), "localhost:9090", "prometheus"}, + {arrow.Timestamp(1620000000000000000), int64(23), "localhost:9100", "node-exporter"}, + {arrow.Timestamp(1620000000000000000), int64(15), "localhost:9100", "prometheus"}, + } + + record := createRecord(t, schema, data) + defer record.Release() + + pipeline := executor.NewBufferedPipeline(record) + defer pipeline.Close() + + builder := newVectorResultBuilder() + err := collectResult(context.Background(), pipeline, builder) + + require.NoError(t, err) + require.Equal(t, 3, builder.Len()) + + result := builder.Build() + vector := result.Data.(promql.Vector) + require.Equal(t, 3, len(vector)) + + // Check first sample + require.Equal(t, int64(1620000000000000000), vector[0].T) + require.Equal(t, 42.0, vector[0].F) + require.Equal(t, labels.FromStrings("instance", "localhost:9090", "job", "prometheus"), vector[0].Metric) + + // Check second sample + require.Equal(t, int64(1620000000000000000), vector[1].T) + require.Equal(t, 23.0, vector[1].F) + require.Equal(t, labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"), vector[1].Metric) + + // Check third sample + require.Equal(t, int64(1620000000000000000), vector[2].T) + require.Equal(t, 15.0, vector[2].F) + require.Equal(t, labels.FromStrings("instance", "localhost:9100", "job", "prometheus"), vector[2].Metric) + }) + + // TODO:(ashwanth) also enforce grouping labels are all present? + t.Run("rows without timestamp or value are ignored", func(t *testing.T) { + schema := arrow.NewSchema( + []arrow.Field{ + {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, + {Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Integer)}, + {Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString}, + }, + nil, + ) + + data := [][]interface{}{ + {nil, int64(42), "localhost:9090"}, + {arrow.Timestamp(1620000000000000000), nil, "localhost:9100"}, + } + + record := createRecord(t, schema, data) + defer record.Release() + + pipeline := executor.NewBufferedPipeline(record) + defer pipeline.Close() + + builder := newVectorResultBuilder() + err := collectResult(context.Background(), pipeline, builder) + + require.NoError(t, err) + require.Equal(t, 0, builder.Len(), "expected no samples to be collected") + }) +} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index b6b6775a1c..226c7fc835 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -3,25 +3,22 @@ package engine import ( "context" "encoding/base64" + "fmt" "strings" "time" - "github.com/apache/arrow-go/v18/arrow" - "github.com/apache/arrow-go/v18/arrow/array" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/engine/executor" - "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/logical" "github.com/grafana/loki/v3/pkg/engine/planner/physical" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" utillog "github.com/grafana/loki/v3/pkg/util/log" @@ -75,8 +72,6 @@ func (e *QueryEngine) Query(params logql.Params) logql.Query { func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error) { start := time.Now() - builder := newResultBuilder() - logger := utillog.WithContext(ctx, e.logger) logger = log.With(logger, "query", params.QueryString(), "shard", strings.Join(params.Shards(), ","), "engine", "v2") @@ -85,7 +80,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo if err != nil { level.Warn(logger).Log("msg", "failed to create logical plan", "err", err) e.metrics.subqueries.WithLabelValues(statusNotImplemented).Inc() - return builder.empty(), ErrNotSupported + return logqlmodel.Result{}, ErrNotSupported } e.metrics.logicalPlanning.Observe(time.Since(t).Seconds()) durLogicalPlanning := time.Since(t) @@ -98,19 +93,19 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo t = time.Now() // start stopwatch for physical planning statsCtx, ctx := stats.NewContext(ctx) - executionContext := physical.NewContext(ctx, e.metastore, params.Start(), params.End()) - planner := physical.NewPlanner(executionContext) + catalog := physical.NewMetastoreCatalog(ctx, e.metastore) + planner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), catalog) plan, err := planner.Build(logicalPlan) if err != nil { level.Warn(logger).Log("msg", "failed to create physical plan", "err", err) e.metrics.subqueries.WithLabelValues(statusFailure).Inc() - return builder.empty(), ErrNotSupported + return logqlmodel.Result{}, ErrNotSupported } plan, err = planner.Optimize(plan) if err != nil { level.Warn(logger).Log("msg", "failed to optimize physical plan", "err", err) e.metrics.subqueries.WithLabelValues(statusFailure).Inc() - return builder.empty(), ErrNotSupported + return logqlmodel.Result{}, ErrNotSupported } e.metrics.physicalPlanning.Observe(time.Since(t).Seconds()) durPhysicalPlanning := time.Since(t) @@ -133,12 +128,24 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo pipeline := executor.Run(ctx, cfg, plan) defer pipeline.Close() + var builder ResultBuilder + switch params.GetExpression().(type) { + case syntax.LogSelectorExpr: + builder = newStreamsResultBuilder() + case syntax.SampleExpr: + // assume instant query since logical planning would fail for range queries. + builder = newVectorResultBuilder() + default: + // should never happen as we already check the expression type in the logical planner + panic(fmt.Sprintf("failed to execute. Invalid exprression type (%T)", params.GetExpression())) + } + if err := collectResult(ctx, pipeline, builder); err != nil { e.metrics.subqueries.WithLabelValues(statusFailure).Inc() - return builder.empty(), err + return logqlmodel.Result{}, err } - builder.setStats(statsCtx.Result(time.Since(start), 0, builder.len())) + builder.SetStats(statsCtx.Result(time.Since(start), 0, builder.Len())) e.metrics.subqueries.WithLabelValues(statusSuccess).Inc() e.metrics.execution.Observe(time.Since(t).Seconds()) @@ -151,10 +158,10 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo "duration_execution", durExecution, ) - return builder.build(), nil + return builder.Build(), nil } -func collectResult(_ context.Context, pipeline executor.Pipeline, result *resultBuilder) error { +func collectResult(_ context.Context, pipeline executor.Pipeline, builder ResultBuilder) error { for { if err := pipeline.Read(); err != nil { if errors.Is(err, executor.EOF) { @@ -162,86 +169,18 @@ func collectResult(_ context.Context, pipeline executor.Pipeline, result *result } return err } - if err := collectRecord(pipeline, result); err != nil { + + rec, err := pipeline.Value() + if err != nil { return err } - } - return nil -} -func collectRecord(pipeline executor.Pipeline, result *resultBuilder) error { - rec, err := pipeline.Value() - if err != nil { - return err - } - defer rec.Release() - for rowIdx := range int(rec.NumRows()) { - collectRow(rec, rowIdx, result) + builder.CollectRecord(rec) + rec.Release() } return nil } -func collectRow(rec arrow.Record, i int, result *resultBuilder) { - var entry logproto.Entry - lbs := labels.NewBuilder(labels.EmptyLabels()) - metadata := labels.NewBuilder(labels.EmptyLabels()) - - for colIdx := range int(rec.NumCols()) { - col := rec.Column(colIdx) - colName := rec.ColumnName(colIdx) - - // TODO(chaudum): We need to add metadata to columns to identify builtins, labels, metadata, and parsed. - field := rec.Schema().Field(colIdx) - colType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType) - - // Ignore column values that are NULL or invalid or don't have a column typ - if col.IsNull(i) || !col.IsValid(i) || !ok { - continue - } - - // Extract line - if colName == types.ColumnNameBuiltinMessage && colType == types.ColumnTypeBuiltin.String() { - entry.Line = col.(*array.String).Value(i) - continue - } - - // Extract timestamp - if colName == types.ColumnNameBuiltinTimestamp && colType == types.ColumnTypeBuiltin.String() { - entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i))) - continue - } - - // Extract label - if colType == types.ColumnTypeLabel.String() { - switch arr := col.(type) { - case *array.String: - lbs.Set(colName, arr.Value(i)) - } - continue - } - - // Extract metadata - if colType == types.ColumnTypeMetadata.String() { - switch arr := col.(type) { - case *array.String: - metadata.Set(colName, arr.Value(i)) - } - continue - } - } - entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels()) - - stream := lbs.Labels() - - // Ignore rows that don't have stream labels, log line, or timestamp - if stream.Len() == 0 || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) { - return - } - - // Finally, add newly created entry to builder - result.add(stream, entry) -} - var _ logql.Engine = (*QueryEngine)(nil) // queryAdapter dispatches query execution to the wrapped engine. diff --git a/pkg/engine/executor/range_aggregation.go b/pkg/engine/executor/range_aggregation.go index 878abde0ec..02e8c6cbd6 100644 --- a/pkg/engine/executor/range_aggregation.go +++ b/pkg/engine/executor/range_aggregation.go @@ -20,10 +20,10 @@ type rangeAggregationOptions struct { partitionBy []physical.ColumnExpression // start and end timestamps are equal for instant queries. - startTs time.Time // start timestamp of the query - endTs time.Time // end timestamp of the query - rangeInterval time.Duration // range interval - step *time.Duration // step used for range queries, nil for instant queries + startTs time.Time // start timestamp of the query + endTs time.Time // end timestamp of the query + rangeInterval time.Duration // range interval + step time.Duration // step used for range queries } // RangeAggregationPipeline is a pipeline that performs aggregations over a time window. diff --git a/pkg/engine/executor/range_aggregation_test.go b/pkg/engine/executor/range_aggregation_test.go index 0ef487378b..db0a386477 100644 --- a/pkg/engine/executor/range_aggregation_test.go +++ b/pkg/engine/executor/range_aggregation_test.go @@ -81,7 +81,6 @@ func TestRangeAggregationPipeline(t *testing.T) { startTs: now, endTs: now, rangeInterval: 10 * time.Minute, - step: nil, // instant query } pipeline, err := NewRangeAggregationPipeline([]Pipeline{input1, input2}, expressionEvaluator{}, opts) diff --git a/pkg/engine/internal/datatype/literal.go b/pkg/engine/internal/datatype/literal.go index 6dc3b668b6..67d3cd0178 100644 --- a/pkg/engine/internal/datatype/literal.go +++ b/pkg/engine/internal/datatype/literal.go @@ -4,6 +4,8 @@ import ( "fmt" "strconv" "time" + + "github.com/grafana/loki/v3/pkg/engine/internal/util" ) type NullLiteral struct { @@ -126,7 +128,7 @@ type TimestampLiteral struct { // String implements Literal. func (t *TimestampLiteral) String() string { - return time.Unix(0, t.v).UTC().Format(time.RFC3339Nano) + return util.FormatTimeRFC3339Nano(time.Unix(0, t.v)) } // Type implements Literal. diff --git a/pkg/engine/internal/util/time.go b/pkg/engine/internal/util/time.go new file mode 100644 index 0000000000..5c6fa1962b --- /dev/null +++ b/pkg/engine/internal/util/time.go @@ -0,0 +1,9 @@ +package util + +import "time" + +// FormatTimeRFC3339Nano formats the given time in RFC3339Nano format in UTC. +// Use this everywhere in the engine for consistent timestamp formatting. +func FormatTimeRFC3339Nano(t time.Time) string { + return t.UTC().Format(time.RFC3339Nano) +} diff --git a/pkg/engine/planner/logical/builder.go b/pkg/engine/planner/logical/builder.go index 669cf88a1f..3dcc56ba25 100644 --- a/pkg/engine/planner/logical/builder.go +++ b/pkg/engine/planner/logical/builder.go @@ -57,7 +57,7 @@ func (b *Builder) RangeAggregation( partitionBy []ColumnRef, operation types.RangeAggregationType, startTS, endTS time.Time, - step *time.Duration, + step time.Duration, rangeInterval time.Duration, ) *Builder { return &Builder{ diff --git a/pkg/engine/planner/logical/format_tree.go b/pkg/engine/planner/logical/format_tree.go index 1f5f97f462..276af5867e 100644 --- a/pkg/engine/planner/logical/format_tree.go +++ b/pkg/engine/planner/logical/format_tree.go @@ -4,6 +4,7 @@ import ( "fmt" "io" + "github.com/grafana/loki/v3/pkg/engine/internal/util" "github.com/grafana/loki/v3/pkg/engine/planner/internal/tree" ) @@ -135,15 +136,12 @@ func (t *treeFormatter) convertRangeAggregation(r *RangeAggregation) *tree.Node properties := []tree.Property{ tree.NewProperty("table", false, r.Table.Name()), tree.NewProperty("operation", false, r.Operation), - tree.NewProperty("start_ts", false, r.Start), - tree.NewProperty("end_ts", false, r.End), + tree.NewProperty("start_ts", false, util.FormatTimeRFC3339Nano(r.Start)), + tree.NewProperty("end_ts", false, util.FormatTimeRFC3339Nano(r.End)), + tree.NewProperty("step", false, r.Step), tree.NewProperty("range", false, r.RangeInterval), } - if r.Step != nil { - properties = append(properties, tree.NewProperty("step", false, r.Step)) - } - if len(r.PartitionBy) > 0 { partitionBy := make([]any, len(r.PartitionBy)) for i := range r.PartitionBy { diff --git a/pkg/engine/planner/logical/format_tree_test.go b/pkg/engine/planner/logical/format_tree_test.go index c8ce60bf50..cef6719446 100644 --- a/pkg/engine/planner/logical/format_tree_test.go +++ b/pkg/engine/planner/logical/format_tree_test.go @@ -128,7 +128,7 @@ func TestFormatRangeAggregationQuery(t *testing.T) { types.RangeAggregationTypeCount, time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), // Start Time time.Date(1970, 1, 1, 1, 0, 0, 0, time.UTC), // End Time - nil, + time.Minute, time.Minute*5, // Range ) @@ -143,7 +143,7 @@ func TestFormatRangeAggregationQuery(t *testing.T) { t.Logf("Actual output:\n%s", actual) expected := ` -RangeAggregation <%5> table=%4 operation=count start_ts=1970-01-01 00:00:00 +0000 UTC end_ts=1970-01-01 01:00:00 +0000 UTC range=5m0s partition_by=(ambiguous.label1, ambiguous.label2) +RangeAggregation <%5> table=%4 operation=count start_ts=1970-01-01T00:00:00Z end_ts=1970-01-01T01:00:00Z step=1m0s range=5m0s partition_by=(ambiguous.label1, ambiguous.label2) │ ├── ColumnRef column=label1 type=ambiguous │ └── ColumnRef column=label2 type=ambiguous └── SELECT <%4> table=%2 predicate=%3 diff --git a/pkg/engine/planner/logical/node_range_aggregate.go b/pkg/engine/planner/logical/node_range_aggregate.go index 26cab1fed1..5780f191f0 100644 --- a/pkg/engine/planner/logical/node_range_aggregate.go +++ b/pkg/engine/planner/logical/node_range_aggregate.go @@ -5,6 +5,7 @@ import ( "time" "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/engine/internal/util" "github.com/grafana/loki/v3/pkg/engine/planner/schema" ) @@ -22,7 +23,7 @@ type RangeAggregation struct { Operation types.RangeAggregationType // The type of aggregation operation to perform. Start time.Time End time.Time - Step *time.Duration // optional for instant queries + Step time.Duration RangeInterval time.Duration } @@ -41,10 +42,7 @@ func (r *RangeAggregation) Name() string { // String returns the disassembled SSA form of the RangeAggregation instruction. func (r *RangeAggregation) String() string { - props := fmt.Sprintf("operation=%s, start_ts=%s, end_ts=%s, range=%s", r.Operation, r.Start, r.End, r.RangeInterval) - if r.Step != nil { - props += fmt.Sprintf(", step=%s", r.Step) - } + props := fmt.Sprintf("operation=%s, start_ts=%s, end_ts=%s, step=%s, range=%s", r.Operation, util.FormatTimeRFC3339Nano(r.Start), util.FormatTimeRFC3339Nano(r.End), r.Step, r.RangeInterval) if len(r.PartitionBy) > 0 { partitionBy := "" diff --git a/pkg/engine/planner/logical/planner.go b/pkg/engine/planner/logical/planner.go index 4c1fd2a09b..951c5d9f5c 100644 --- a/pkg/engine/planner/logical/planner.go +++ b/pkg/engine/planner/logical/planner.go @@ -18,50 +18,74 @@ var errUnimplemented = errors.New("query contains unimplemented features") // BuildPlan converts a LogQL query represented as [logql.Params] into a logical [Plan]. // It may return an error as second argument in case the traversal of the AST of the query fails. -func BuildPlan(query logql.Params) (*Plan, error) { - var selector Value - var predicates []Value +func BuildPlan(params logql.Params) (*Plan, error) { + var ( + builder *Builder + err error + ) - // TODO(chaudum): Implement a Walk function that can return an error - var err error + switch e := params.GetExpression().(type) { + case syntax.LogSelectorExpr: + builder, err = buildPlanForLogQuery(e, params, false, 0) + case syntax.SampleExpr: + builder, err = buildPlanForSampleQuery(e, params) + default: + err = fmt.Errorf("unexpected expression type (%T)", e) + } + + if err != nil { + return nil, fmt.Errorf("failed to convert AST into logical plan: %w", err) + } - expr := query.GetExpression() + return builder.ToPlan() +} + +// buildPlanForLogQuery builds logical plan operations by traversing [syntax.LogSelectorExpr] +// isMetricQuery should be set to true if this expr is encountered when processing a [syntax.SampleExpr]. +// rangeInterval should be set to a non-zero value if the query contains [$range]. +func buildPlanForLogQuery(expr syntax.LogSelectorExpr, params logql.Params, isMetricQuery bool, rangeInterval time.Duration) (*Builder, error) { + var ( + err error + selector Value + predicates []Value + ) + + // TODO(chaudum): Implement a Walk function that can return an error expr.Walk(func(e syntax.Expr) bool { switch e := e.(type) { - case syntax.SampleExpr: - err = errUnimplemented - return false // do not traverse children - case *syntax.LineParserExpr, *syntax.LogfmtParserExpr, *syntax.LogfmtExpressionParserExpr, *syntax.JSONExpressionParserExpr: - err = errUnimplemented - return false // do not traverse children - case *syntax.LineFmtExpr, *syntax.LabelFmtExpr: - err = errUnimplemented - return false // do not traverse children - case *syntax.KeepLabelsExpr, *syntax.DropLabelsExpr: - err = errUnimplemented - return false // do not traverse children + case *syntax.PipelineExpr: + // [PipelineExpr] is a container for other expressions, nothing to do here. + return true case *syntax.MatchersExpr: selector = convertLabelMatchers(e.Matchers()) + return true case *syntax.LineFilterExpr: predicates = append(predicates, convertLineFilterExpr(e)) // We do not want to traverse the AST further down, because line filter expressions can be nested, // which would lead to multiple predicates of the same expression. - return false + return false // do not traverse children case *syntax.LabelFilterExpr: if val, innerErr := convertLabelFilter(e.LabelFilterer); innerErr != nil { err = innerErr } else { predicates = append(predicates, val) } + return true + case *syntax.LineParserExpr, *syntax.LogfmtParserExpr, *syntax.LogfmtExpressionParserExpr, *syntax.JSONExpressionParserExpr, + *syntax.LineFmtExpr, *syntax.LabelFmtExpr, + *syntax.KeepLabelsExpr, *syntax.DropLabelsExpr: + err = errUnimplemented + return false // do not traverse children + default: + err = errUnimplemented + return false // do not traverse children } - return true }) - if err != nil { - return nil, fmt.Errorf("failed to convert AST into logical plan: %w", err) + return nil, err } - shard, err := parseShards(query.Shards()) + shard, err := parseShards(params.Shards()) if err != nil { return nil, fmt.Errorf("failed to parse shard: %w", err) } @@ -74,15 +98,19 @@ func BuildPlan(query logql.Params) (*Plan, error) { }, ) - // SORT -> SortMerge - direction := query.Direction() - ascending := direction == logproto.FORWARD - builder = builder.Sort(*timestampColumnRef(), ascending, false) + // Metric queries currently do not expect the logs to be sorted by timestamp. + if !isMetricQuery { + // SORT -> SortMerge + direction := params.Direction() + ascending := direction == logproto.FORWARD + builder = builder.Sort(*timestampColumnRef(), ascending, false) + } // SELECT -> Filter - start := query.Start() - end := query.End() - for _, value := range convertQueryRangeToPredicates(start, end) { + start := params.Start() + end := params.End() + // extend search by rangeInterval to be able to include entries belonging to the [$range] interval. + for _, value := range convertQueryRangeToPredicates(start.Add(-rangeInterval), end) { builder = builder.Select(value) } @@ -90,12 +118,89 @@ func BuildPlan(query logql.Params) (*Plan, error) { builder = builder.Select(value) } - // LIMIT -> Limit - limit := query.Limit() - builder = builder.Limit(0, limit) + // Metric queries do not apply a limit. + if !isMetricQuery { + // LIMIT -> Limit + limit := params.Limit() + builder = builder.Limit(0, limit) + } + + return builder, nil +} + +func buildPlanForSampleQuery(e syntax.SampleExpr, params logql.Params) (*Builder, error) { + if params.Step() > 0 { + return nil, fmt.Errorf("only instant metric queries are supported: %w", errUnimplemented) + } + + var ( + err error + + rangeAggType types.RangeAggregationType + rangeInterval time.Duration + + vecAggType types.VectorAggregationType + groupBy []ColumnRef + ) + + e.Walk(func(e syntax.Expr) bool { + switch e := e.(type) { + case *syntax.RangeAggregationExpr: + // only count operation is supported for range aggregation. + // offsets are not yet supported. + if e.Operation != syntax.OpRangeTypeCount || e.Left.Offset != 0 { + err = errUnimplemented + return false + } + + rangeAggType = types.RangeAggregationTypeCount + rangeInterval = e.Left.Interval + return false // do not traverse log range query + + case *syntax.VectorAggregationExpr: + // only sum operation is supported for vector aggregation + // grouping by atleast one label is required. + if e.Operation != syntax.OpTypeSum || + e.Grouping == nil || len(e.Grouping.Groups) == 0 || e.Grouping.Without { + err = errUnimplemented + return false + } + + vecAggType = types.VectorAggregationTypeSum + groupBy = make([]ColumnRef, 0, len(e.Grouping.Groups)) + for _, group := range e.Grouping.Groups { + groupBy = append(groupBy, *NewColumnRef(group, types.ColumnTypeAmbiguous)) + } + + return true + default: + err = errUnimplemented + return false // do not traverse children + } + }) + if err != nil { + return nil, err + } + + if rangeAggType == types.RangeAggregationTypeInvalid || vecAggType == types.VectorAggregationTypeInvalid { + return nil, errUnimplemented + } + + logSelectorExpr, err := e.Selector() + if err != nil { + return nil, err + } + + builder, err := buildPlanForLogQuery(logSelectorExpr, params, true, rangeInterval) + if err != nil { + return nil, err + } + + builder = builder.RangeAggregation( + nil, rangeAggType, params.Start(), params.End(), params.Step(), rangeInterval, + ).VectorAggregation(groupBy, vecAggType) - plan, err := builder.ToPlan() - return plan, err + return builder, nil } func convertLabelMatchers(matchers []*labels.Matcher) Value { diff --git a/pkg/engine/planner/logical/planner_test.go b/pkg/engine/planner/logical/planner_test.go index 87efec96e4..f5e2c48c41 100644 --- a/pkg/engine/planner/logical/planner_test.go +++ b/pkg/engine/planner/logical/planner_test.go @@ -14,10 +14,11 @@ import ( ) type query struct { - statement string - start, end int64 - direction logproto.Direction - limit uint32 + statement string + start, end int64 + step, interval time.Duration + direction logproto.Direction + limit uint32 } // Direction implements logql.Params. @@ -72,7 +73,7 @@ func (q *query) Shards() []string { // Step implements logql.Params. func (q *query) Step() time.Duration { - panic("unimplemented") + return q.step } var _ logql.Params = (*query)(nil) @@ -120,6 +121,41 @@ RETURN %20 t.Logf("\n%s\n", sb.String()) } +func TestConvertAST_MetricQuery_Success(t *testing.T) { + q := &query{ + statement: `sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`, + start: 3600, + end: 7200, + interval: 5 * time.Minute, + } + + logicalPlan, err := BuildPlan(q) + require.NoError(t, err) + t.Logf("\n%s\n", logicalPlan.String()) + + expected := `%1 = EQ label.cluster "prod" +%2 = MATCH_RE label.namespace "loki-.*" +%3 = AND %1 %2 +%4 = MAKETABLE [selector=%3, shard=0_of_1] +%5 = GTE builtin.timestamp 1970-01-01T00:55:00Z +%6 = SELECT %4 [predicate=%5] +%7 = LT builtin.timestamp 1970-01-01T02:00:00Z +%8 = SELECT %6 [predicate=%7] +%9 = MATCH_STR builtin.message "metric.go" +%10 = SELECT %8 [predicate=%9] +%11 = RANGE_AGGREGATION %10 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%12 = VECTOR_AGGREGATION %11 [operation=sum, group_by=(ambiguous.level)] +RETURN %12 +` + + require.Equal(t, expected, logicalPlan.String()) + + var sb strings.Builder + PrintTree(&sb, logicalPlan.Value()) + + t.Logf("\n%s\n", sb.String()) +} + func TestCanExecuteQuery(t *testing.T) { for _, tt := range []struct { statement string @@ -179,7 +215,27 @@ func TestCanExecuteQuery(t *testing.T) { statement: `{env="prod"} |= "metric.go" | retry > 2`, }, { - statement: `sum(rate({env="prod"}[1m]))`, + statement: `sum by (level) (count_over_time({env="prod"}[1m]))`, + expected: true, + }, + { + // both vector and range aggregation are required + statement: `count_over_time({env="prod"}[1m])`, + }, + { + // group by labels are required + statement: `sum(count_over_time({env="prod"}[1m]))`, + }, + { + // rate is not supported + statement: `sum by (level) (rate({env="prod"}[1m]))`, + }, + { + // max is not supported + statement: `max by (level) (count_over_time({env="prod"}[1m]))`, + }, + { + statement: `sum by (level) (count_over_time({env="prod"}[1m] offset 5m))`, }, } { t.Run(tt.statement, func(t *testing.T) { diff --git a/pkg/engine/planner/physical/context.go b/pkg/engine/planner/physical/catalog.go similarity index 82% rename from pkg/engine/planner/physical/context.go rename to pkg/engine/planner/physical/catalog.go index dcfbf0d90f..eff7ab0a2e 100644 --- a/pkg/engine/planner/physical/context.go +++ b/pkg/engine/planner/physical/catalog.go @@ -41,35 +41,32 @@ type Catalog interface { // ResolveDataObj returns a list of data object paths, // a list of stream IDs for each data data object, // and a list of sections for each data object - ResolveDataObj(Expression) ([]DataObjLocation, [][]int64, [][]int, error) - ResolveDataObjWithShard(Expression, ShardInfo) ([]DataObjLocation, [][]int64, [][]int, error) + ResolveDataObj(Expression, time.Time, time.Time) ([]DataObjLocation, [][]int64, [][]int, error) + ResolveDataObjWithShard(Expression, ShardInfo, time.Time, time.Time) ([]DataObjLocation, [][]int64, [][]int, error) } -// Context is the default implementation of [Catalog]. -type Context struct { - ctx context.Context - metastore metastore.Metastore - from, through time.Time +// MetastoreCatalog is the default implementation of [Catalog]. +type MetastoreCatalog struct { + ctx context.Context + metastore metastore.Metastore } -// NewContext creates a new instance of [Context] for query planning. -func NewContext(ctx context.Context, ms metastore.Metastore, from, through time.Time) *Context { - return &Context{ +// NewMetastoreCatalog creates a new instance of [MetastoreCatalog] for query planning. +func NewMetastoreCatalog(ctx context.Context, ms metastore.Metastore) *MetastoreCatalog { + return &MetastoreCatalog{ ctx: ctx, metastore: ms, - from: from, - through: through, } } // ResolveDataObj resolves DataObj locations and streams IDs based on a given // [Expression]. The expression is required to be a (tree of) [BinaryExpression] // with a [ColumnExpression] on the left and a [LiteralExpression] on the right. -func (c *Context) ResolveDataObj(selector Expression) ([]DataObjLocation, [][]int64, [][]int, error) { - return c.ResolveDataObjWithShard(selector, noShard) +func (c *MetastoreCatalog) ResolveDataObj(selector Expression, from, through time.Time) ([]DataObjLocation, [][]int64, [][]int, error) { + return c.ResolveDataObjWithShard(selector, noShard, from, through) } -func (c *Context) ResolveDataObjWithShard(selector Expression, shard ShardInfo) ([]DataObjLocation, [][]int64, [][]int, error) { +func (c *MetastoreCatalog) ResolveDataObjWithShard(selector Expression, shard ShardInfo, from, through time.Time) ([]DataObjLocation, [][]int64, [][]int, error) { if c.metastore == nil { return nil, nil, nil, errors.New("no metastore to resolve objects") } @@ -79,7 +76,7 @@ func (c *Context) ResolveDataObjWithShard(selector Expression, shard ShardInfo) return nil, nil, nil, fmt.Errorf("failed to convert selector expression into matchers: %w", err) } - paths, streamIDs, numSections, err := c.metastore.StreamIDs(c.ctx, c.from, c.through, matchers...) + paths, streamIDs, numSections, err := c.metastore.StreamIDs(c.ctx, from, through, matchers...) if err != nil { return nil, nil, nil, fmt.Errorf("failed to resolve data object locations: %w", err) } @@ -187,4 +184,4 @@ func convertBinaryOp(t types.BinaryOp) (labels.MatchType, error) { return ty, nil } -var _ Catalog = (*Context)(nil) +var _ Catalog = (*MetastoreCatalog)(nil) diff --git a/pkg/engine/planner/physical/context_test.go b/pkg/engine/planner/physical/catalog_test.go similarity index 95% rename from pkg/engine/planner/physical/context_test.go rename to pkg/engine/planner/physical/catalog_test.go index 3f929f0299..c554a8d72e 100644 --- a/pkg/engine/planner/physical/context_test.go +++ b/pkg/engine/planner/physical/catalog_test.go @@ -10,7 +10,7 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/types" ) -func TestContext_ConvertLiteral(t *testing.T) { +func TestCatalog_ConvertLiteral(t *testing.T) { tests := []struct { expr Expression want string @@ -63,7 +63,7 @@ func TestContext_ConvertLiteral(t *testing.T) { } } -func TestContext_ConvertColumnRef(t *testing.T) { +func TestCatalog_ConvertColumnRef(t *testing.T) { tests := []struct { expr Expression want string @@ -108,7 +108,7 @@ func TestContext_ConvertColumnRef(t *testing.T) { } } -func TestContext_ExpressionToMatchers(t *testing.T) { +func TestCatalog_ExpressionToMatchers(t *testing.T) { tests := []struct { expr Expression want []*labels.Matcher diff --git a/pkg/engine/planner/physical/planner.go b/pkg/engine/planner/physical/planner.go index 276fa90a67..694f0790db 100644 --- a/pkg/engine/planner/physical/planner.go +++ b/pkg/engine/planner/physical/planner.go @@ -3,13 +3,62 @@ package physical import ( "errors" "fmt" + "time" "github.com/grafana/loki/v3/pkg/engine/planner/logical" ) -// Internal state of the planner -type state struct { - direction SortOrder +// Context carries planning state that needs to be propagated down the plan tree. +// This enables each branch to have independent context, which is essential for complex queries: +// - Binary operations with different [$range] intervals, offsets or @timestamp. +// +// Propagation: +// - Each process() method passes context to children. +// - Context can be cloned and modified by nodes (ex: RangeAggregation sets rangeInterval) that gets inherited by all descendants. +type Context struct { + from time.Time + through time.Time + rangeInterval time.Duration + direction SortOrder +} + +func NewContext(from, through time.Time) *Context { + return &Context{ + from: from, + through: through, + } +} + +func (pc *Context) Clone() *Context { + return &Context{ + from: pc.from, + through: pc.through, + rangeInterval: pc.rangeInterval, + direction: pc.direction, + } +} + +func (pc *Context) WithDirection(direction SortOrder) *Context { + cloned := pc.Clone() + cloned.direction = direction + return cloned +} + +func (pc *Context) WithRangeInterval(rangeInterval time.Duration) *Context { + cloned := pc.Clone() + cloned.rangeInterval = rangeInterval + return cloned +} + +func (pc *Context) WithTimeRange(from, through time.Time) *Context { + cloned := pc.Clone() + cloned.from = from + cloned.through = through + return cloned +} + +func (pc *Context) GetResolveTimeRange() (from, through time.Time) { + return pc.from.Add(-pc.rangeInterval), pc.through } // Planner creates an executable physical plan from a logical plan. @@ -21,14 +70,17 @@ type state struct { // a) Push down the limit of the Limit node to the DataObjScan nodes. // b) Push down the predicate from the Filter node to the DataObjScan nodes. type Planner struct { + context *Context catalog Catalog plan *Plan - state state } // NewPlanner creates a new planner instance with the given context. -func NewPlanner(catalog Catalog) *Planner { - return &Planner{catalog: catalog} +func NewPlanner(ctx *Context, catalog Catalog) *Planner { + return &Planner{ + context: ctx, + catalog: catalog, + } } // Build converts a given logical plan into a physical plan and returns an error if the conversion fails. @@ -38,7 +90,7 @@ func (p *Planner) Build(lp *logical.Plan) (*Plan, error) { for _, inst := range lp.Instructions { switch inst := inst.(type) { case *logical.Return: - nodes, err := p.process(inst.Value) + nodes, err := p.process(inst.Value, p.context) if err != nil { return nil, err } @@ -54,7 +106,6 @@ func (p *Planner) Build(lp *logical.Plan) (*Plan, error) { // reset resets the internal state of the planner func (p *Planner) reset() { p.plan = &Plan{} - p.state = state{} } // Convert a predicate from an [logical.Instruction] into an [Expression]. @@ -81,32 +132,33 @@ func (p *Planner) convertPredicate(inst logical.Value) Expression { } // Convert a [logical.Instruction] into one or multiple [Node]s. -func (p *Planner) process(inst logical.Value) ([]Node, error) { +func (p *Planner) process(inst logical.Value, ctx *Context) ([]Node, error) { switch inst := inst.(type) { case *logical.MakeTable: - return p.processMakeTable(inst) + return p.processMakeTable(inst, ctx) case *logical.Select: - return p.processSelect(inst) + return p.processSelect(inst, ctx) case *logical.Sort: - return p.processSort(inst) + return p.processSort(inst, ctx) case *logical.Limit: - return p.processLimit(inst) + return p.processLimit(inst, ctx) case *logical.RangeAggregation: - return p.processRangeAggregation(inst) + return p.processRangeAggregation(inst, ctx) case *logical.VectorAggregation: - return p.processVectorAggregation(inst) + return p.processVectorAggregation(inst, ctx) } return nil, nil } // Convert [logical.MakeTable] into one or more [DataObjScan] nodes. -func (p *Planner) processMakeTable(lp *logical.MakeTable) ([]Node, error) { +func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) ([]Node, error) { shard, ok := lp.Shard.(*logical.ShardInfo) if !ok { return nil, fmt.Errorf("invalid shard, got %T", lp.Shard) } - objects, streams, sections, err := p.catalog.ResolveDataObjWithShard(p.convertPredicate(lp.Selector), ShardInfo(*shard)) + from, through := ctx.GetResolveTimeRange() + objects, streams, sections, err := p.catalog.ResolveDataObjWithShard(p.convertPredicate(lp.Selector), ShardInfo(*shard), from, through) if err != nil { return nil, err } @@ -117,7 +169,7 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable) ([]Node, error) { Location: objects[i], StreamIDs: streams[i], Sections: sections[i], - Direction: p.state.direction, // apply direction from previously visited Sort node + Direction: ctx.direction, // apply direction from previously visited Sort node } p.plan.addNode(node) nodes = append(nodes, node) @@ -126,12 +178,12 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable) ([]Node, error) { } // Convert [logical.Select] into one [Filter] node. -func (p *Planner) processSelect(lp *logical.Select) ([]Node, error) { +func (p *Planner) processSelect(lp *logical.Select, ctx *Context) ([]Node, error) { node := &Filter{ Predicates: []Expression{p.convertPredicate(lp.Predicate)}, } p.plan.addNode(node) - children, err := p.process(lp.Table) + children, err := p.process(lp.Table, ctx) if err != nil { return nil, err } @@ -144,7 +196,7 @@ func (p *Planner) processSelect(lp *logical.Select) ([]Node, error) { } // Convert [logical.Sort] into one [SortMerge] node. -func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) { +func (p *Planner) processSort(lp *logical.Sort, ctx *Context) ([]Node, error) { order := DESC if lp.Ascending { order = ASC @@ -153,12 +205,14 @@ func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) { Column: &ColumnExpr{Ref: lp.Column.Ref}, Order: order, } - p.state.direction = order + p.plan.addNode(node) - children, err := p.process(lp.Table) + + children, err := p.process(lp.Table, ctx.WithDirection(order)) if err != nil { return nil, err } + for i := range children { if err := p.plan.addEdge(Edge{Parent: node, Child: children[i]}); err != nil { return nil, err @@ -168,13 +222,13 @@ func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) { } // Convert [logical.Limit] into one [Limit] node. -func (p *Planner) processLimit(lp *logical.Limit) ([]Node, error) { +func (p *Planner) processLimit(lp *logical.Limit, ctx *Context) ([]Node, error) { node := &Limit{ Skip: lp.Skip, Fetch: lp.Fetch, } p.plan.addNode(node) - children, err := p.process(lp.Table) + children, err := p.process(lp.Table, ctx) if err != nil { return nil, err } @@ -186,7 +240,7 @@ func (p *Planner) processLimit(lp *logical.Limit) ([]Node, error) { return []Node{node}, nil } -func (p *Planner) processRangeAggregation(r *logical.RangeAggregation) ([]Node, error) { +func (p *Planner) processRangeAggregation(r *logical.RangeAggregation, ctx *Context) ([]Node, error) { partitionBy := make([]ColumnExpression, len(r.PartitionBy)) for i, col := range r.PartitionBy { partitionBy[i] = &ColumnExpr{Ref: col.Ref} @@ -201,10 +255,12 @@ func (p *Planner) processRangeAggregation(r *logical.RangeAggregation) ([]Node, Step: r.Step, } p.plan.addNode(node) - children, err := p.process(r.Table) + + children, err := p.process(r.Table, ctx.WithRangeInterval(r.RangeInterval)) if err != nil { return nil, err } + for i := range children { if err := p.plan.addEdge(Edge{Parent: node, Child: children[i]}); err != nil { return nil, err @@ -214,7 +270,7 @@ func (p *Planner) processRangeAggregation(r *logical.RangeAggregation) ([]Node, } // Convert [logical.VectorAggregation] into one [VectorAggregation] node. -func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation) ([]Node, error) { +func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation, ctx *Context) ([]Node, error) { groupBy := make([]ColumnExpression, len(lp.GroupBy)) for i, col := range lp.GroupBy { groupBy[i] = &ColumnExpr{Ref: col.Ref} @@ -225,7 +281,7 @@ func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation) ([]Nod Operation: lp.Operation, } p.plan.addNode(node) - children, err := p.process(lp.Table) + children, err := p.process(lp.Table, ctx) if err != nil { return nil, err } diff --git a/pkg/engine/planner/physical/planner_test.go b/pkg/engine/planner/physical/planner_test.go index f510c99084..494e2d9764 100644 --- a/pkg/engine/planner/physical/planner_test.go +++ b/pkg/engine/planner/physical/planner_test.go @@ -21,12 +21,12 @@ type catalog struct { } // ResolveDataObj implements Catalog. -func (c *catalog) ResolveDataObj(e Expression) ([]DataObjLocation, [][]int64, [][]int, error) { - return c.ResolveDataObjWithShard(e, noShard) +func (c *catalog) ResolveDataObj(e Expression, from, through time.Time) ([]DataObjLocation, [][]int64, [][]int, error) { + return c.ResolveDataObjWithShard(e, noShard, from, through) } // ResolveDataObjForShard implements Catalog. -func (c *catalog) ResolveDataObjWithShard(_ Expression, shard ShardInfo) ([]DataObjLocation, [][]int64, [][]int, error) { +func (c *catalog) ResolveDataObjWithShard(_ Expression, shard ShardInfo, _, _ time.Time) ([]DataObjLocation, [][]int64, [][]int, error) { paths := make([]string, 0, len(c.streamsByObject)) streams := make([][]int64, 0, len(c.streamsByObject)) sections := make([]int, 0, len(c.streamsByObject)) @@ -99,7 +99,7 @@ func TestMockCatalog(t *testing.T) { }, } { t.Run("shard "+tt.shard.String(), func(t *testing.T) { - paths, streams, sections, _ := catalog.ResolveDataObjWithShard(nil, tt.shard) + paths, streams, sections, _ := catalog.ResolveDataObjWithShard(nil, tt.shard, time.Now(), time.Now()) require.Equal(t, tt.expPaths, paths) require.Equal(t, tt.expStreams, streams) require.Equal(t, tt.expSections, sections) @@ -142,7 +142,7 @@ func TestPlanner_ConvertMaketable(t *testing.T) { "obj5": {streamIDs: []int64{4, 5}, sections: 2}, }, } - planner := NewPlanner(catalog) + planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog) streamSelector := &logical.BinOp{ Left: logical.NewColumnRef("app", types.ColumnTypeLabel), @@ -197,7 +197,7 @@ func TestPlanner_ConvertMaketable(t *testing.T) { Shard: tt.shard, } planner.reset() - nodes, err := planner.processMakeTable(relation) + nodes, err := planner.processMakeTable(relation, NewContext(time.Now(), time.Now())) require.NoError(t, err) require.Equal(t, tt.expPaths, locations(t, nodes)) @@ -245,7 +245,7 @@ func TestPlanner_Convert(t *testing.T) { "obj2": {streamIDs: []int64{3, 4}, sections: 1}, }, } - planner := NewPlanner(catalog) + planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog) physicalPlan, err := planner.Build(logicalPlan) require.NoError(t, err) @@ -284,7 +284,7 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) { types.RangeAggregationTypeCount, time.Date(2023, 10, 1, 0, 0, 0, 0, time.UTC), // Start Time time.Date(2023, 10, 1, 1, 0, 0, 0, time.UTC), // End Time - nil, // Step + 0, // Step time.Minute*5, // Range ) @@ -297,7 +297,7 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) { "obj2": {streamIDs: []int64{3, 4}, sections: 1}, }, } - planner := NewPlanner(catalog) + planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog) physicalPlan, err := planner.Build(logicalPlan) require.NoError(t, err) diff --git a/pkg/engine/planner/physical/printer.go b/pkg/engine/planner/physical/printer.go index e51e5594fe..0ddf8d87f5 100644 --- a/pkg/engine/planner/physical/printer.go +++ b/pkg/engine/planner/physical/printer.go @@ -62,13 +62,10 @@ func toTreeNode(n Node) *tree.Node { tree.NewProperty("operation", false, node.Operation), tree.NewProperty("start", false, node.Start), tree.NewProperty("end", false, node.End), + tree.NewProperty("step", false, node.Step), tree.NewProperty("range", false, node.Range), } - if node.Step != nil { - properties = append(properties, tree.NewProperty("step", false, node.Step)) - } - if len(node.PartitionBy) > 0 { properties = append(properties, tree.NewProperty("partition_by", true, toAnySlice(node.PartitionBy)...)) } diff --git a/pkg/engine/planner/physical/range_aggregate.go b/pkg/engine/planner/physical/range_aggregate.go index 3d71e587b0..2994273f25 100644 --- a/pkg/engine/planner/physical/range_aggregate.go +++ b/pkg/engine/planner/physical/range_aggregate.go @@ -16,7 +16,7 @@ type RangeAggregation struct { Operation types.RangeAggregationType Start time.Time End time.Time - Step *time.Duration // optional for instant queries + Step time.Duration // optional for instant queries Range time.Duration }