From dd6b31473e21dcfff5b67a0bdbcaf77ab840fbb0 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 31 Oct 2025 11:36:55 -0600 Subject: [PATCH] fix: errors in parse pipeline (#19667) Co-authored-by: Ashwanth Goli --- pkg/engine/internal/executor/parse.go | 4 ++++ pkg/engine/internal/executor/project.go | 18 ++++++++++++------ pkg/engine/internal/util/array_list.go | 4 ++++ pkg/logql/bench/Makefile | 7 ++++++- pkg/logql/bench/config.yaml | 5 ++++- 5 files changed, 30 insertions(+), 8 deletions(-) diff --git a/pkg/engine/internal/executor/parse.go b/pkg/engine/internal/executor/parse.go index e804b9bede..94f86d68b0 100644 --- a/pkg/engine/internal/executor/parse.go +++ b/pkg/engine/internal/executor/parse.go @@ -43,6 +43,10 @@ func parseFn(op types.VariadicOp) VariadicFunction { newFields = append(newFields, semconv.FieldFromIdent(ident, true)) } + if len(parsedColumns) == 0 { + return nil, nil + } + return array.NewStructArrayWithFields(parsedColumns, newFields) }) } diff --git a/pkg/engine/internal/executor/project.go b/pkg/engine/internal/executor/project.go index a9d00311dd..7055219d6a 100644 --- a/pkg/engine/internal/executor/project.go +++ b/pkg/engine/internal/executor/project.go @@ -21,22 +21,24 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex // Get the column names from the projection expressions colRefs := make([]types.ColumnRef, 0, len(proj.Expressions)) - mathExprs := make([]physical.Expression, 0, len(proj.Expressions)) + expandExprs := make([]physical.Expression, 0, len(proj.Expressions)) for i, expr := range proj.Expressions { switch expr := expr.(type) { case *physical.ColumnExpr: colRefs = append(colRefs, expr.Ref) case *physical.UnaryExpr: - mathExprs = append(mathExprs, expr) + expandExprs = append(expandExprs, expr) case *physical.BinaryExpr: - mathExprs = append(mathExprs, expr) + expandExprs = append(expandExprs, expr) + case *physical.VariadicExpr: + expandExprs = append(expandExprs, expr) default: return nil, fmt.Errorf("projection expression %d is unsupported", i) } } - if len(mathExprs) > 1 { + if len(expandExprs) > 1 { return nil, fmt.Errorf("there might be only one math expression for `value` column at a time") } @@ -73,8 +75,8 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex // Create EXPAND projection pipeline: // Keep all columns and expand the ones referenced in proj.Expressions. // TODO: as implemented, epanding and keeping/dropping cannot happen in the same projection. Is this desired? - if proj.All && proj.Expand && len(mathExprs) > 0 { - return newExpandPipeline(mathExprs[0], evaluator, input) + if proj.All && proj.Expand && len(expandExprs) > 0 { + return newExpandPipeline(expandExprs[0], evaluator, input) } return nil, errNotImplemented @@ -143,6 +145,10 @@ func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, return nil, err } + if vec == nil { + return batch, nil + } + switch arrCasted := vec.(type) { case *array.Struct: structSchema, ok := arrCasted.DataType().(*arrow.StructType) diff --git a/pkg/engine/internal/util/array_list.go b/pkg/engine/internal/util/array_list.go index 6eea899d3e..c5e6c87178 100644 --- a/pkg/engine/internal/util/array_list.go +++ b/pkg/engine/internal/util/array_list.go @@ -5,6 +5,10 @@ import ( ) func ArrayListValue(arr *array.List, i int) any { + if arr.Len() == 0 { + return []string{} + } + start, end := arr.ValueOffsets(i) listValues := arr.ListValues() switch listValues := listValues.(type) { diff --git a/pkg/logql/bench/Makefile b/pkg/logql/bench/Makefile index 6e6995d670..24f2818106 100644 --- a/pkg/logql/bench/Makefile +++ b/pkg/logql/bench/Makefile @@ -34,6 +34,7 @@ grafana: -e GF_AUTH_ANONYMOUS_ORG_ROLE=Admin \ -e GF_AUTH_BASIC_ENABLED=false \ -e GF_USERS_DEFAULT_THEME=light \ + -e GF_CACHING_ENABLED=false \ -v $(PWD)/grafana-datasource.yaml:/etc/grafana/provisioning/datasources/datasources.yml \ grafana/grafana:latest @echo "Grafana available at http://localhost:3000" @@ -72,5 +73,9 @@ server: grafana loki # so this task defines a teardown command to kill grafana once loki is stopped. .PHONY: server-debug server-debug: grafana loki-debug - # loki will block, so this won't run until we kill the process + +.PHONY: server-stop +server-stop: @make grafana-stop + @pkill dlv + @pkill loki diff --git a/pkg/logql/bench/config.yaml b/pkg/logql/bench/config.yaml index 6625b85d1d..816cebe60e 100644 --- a/pkg/logql/bench/config.yaml +++ b/pkg/logql/bench/config.yaml @@ -1,5 +1,5 @@ # Loki configuration file for reading data from the logql benchmark dataset. -# Run Loki from the project root +# Run Loki from the project root # $ make loki # $ ./cmd/loki/loki -config.file=./pkg/logql/bench/config.yaml -config.expand-env=true # @@ -83,6 +83,9 @@ querier: enable: true batch_size: 8192 +query_range: + cache_results: false + dataobj: storage_bucket_prefix: "dataobj"