fix: errors in parse pipeline (#19667)

Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/19669/head
Trevor Whitney 2 months ago committed by GitHub
parent c3f12ef3df
commit dd6b31473e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      pkg/engine/internal/executor/parse.go
  2. 18
      pkg/engine/internal/executor/project.go
  3. 4
      pkg/engine/internal/util/array_list.go
  4. 7
      pkg/logql/bench/Makefile
  5. 5
      pkg/logql/bench/config.yaml

@ -43,6 +43,10 @@ func parseFn(op types.VariadicOp) VariadicFunction {
newFields = append(newFields, semconv.FieldFromIdent(ident, true))
}
if len(parsedColumns) == 0 {
return nil, nil
}
return array.NewStructArrayWithFields(parsedColumns, newFields)
})
}

@ -21,22 +21,24 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex
// Get the column names from the projection expressions
colRefs := make([]types.ColumnRef, 0, len(proj.Expressions))
mathExprs := make([]physical.Expression, 0, len(proj.Expressions))
expandExprs := make([]physical.Expression, 0, len(proj.Expressions))
for i, expr := range proj.Expressions {
switch expr := expr.(type) {
case *physical.ColumnExpr:
colRefs = append(colRefs, expr.Ref)
case *physical.UnaryExpr:
mathExprs = append(mathExprs, expr)
expandExprs = append(expandExprs, expr)
case *physical.BinaryExpr:
mathExprs = append(mathExprs, expr)
expandExprs = append(expandExprs, expr)
case *physical.VariadicExpr:
expandExprs = append(expandExprs, expr)
default:
return nil, fmt.Errorf("projection expression %d is unsupported", i)
}
}
if len(mathExprs) > 1 {
if len(expandExprs) > 1 {
return nil, fmt.Errorf("there might be only one math expression for `value` column at a time")
}
@ -73,8 +75,8 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex
// Create EXPAND projection pipeline:
// Keep all columns and expand the ones referenced in proj.Expressions.
// TODO: as implemented, epanding and keeping/dropping cannot happen in the same projection. Is this desired?
if proj.All && proj.Expand && len(mathExprs) > 0 {
return newExpandPipeline(mathExprs[0], evaluator, input)
if proj.All && proj.Expand && len(expandExprs) > 0 {
return newExpandPipeline(expandExprs[0], evaluator, input)
}
return nil, errNotImplemented
@ -143,6 +145,10 @@ func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator,
return nil, err
}
if vec == nil {
return batch, nil
}
switch arrCasted := vec.(type) {
case *array.Struct:
structSchema, ok := arrCasted.DataType().(*arrow.StructType)

@ -5,6 +5,10 @@ import (
)
func ArrayListValue(arr *array.List, i int) any {
if arr.Len() == 0 {
return []string{}
}
start, end := arr.ValueOffsets(i)
listValues := arr.ListValues()
switch listValues := listValues.(type) {

@ -34,6 +34,7 @@ grafana:
-e GF_AUTH_ANONYMOUS_ORG_ROLE=Admin \
-e GF_AUTH_BASIC_ENABLED=false \
-e GF_USERS_DEFAULT_THEME=light \
-e GF_CACHING_ENABLED=false \
-v $(PWD)/grafana-datasource.yaml:/etc/grafana/provisioning/datasources/datasources.yml \
grafana/grafana:latest
@echo "Grafana available at http://localhost:3000"
@ -72,5 +73,9 @@ server: grafana loki
# so this task defines a teardown command to kill grafana once loki is stopped.
.PHONY: server-debug
server-debug: grafana loki-debug
# loki will block, so this won't run until we kill the process
.PHONY: server-stop
server-stop:
@make grafana-stop
@pkill dlv
@pkill loki

@ -1,5 +1,5 @@
# Loki configuration file for reading data from the logql benchmark dataset.
# Run Loki from the project root
# Run Loki from the project root
# $ make loki
# $ ./cmd/loki/loki -config.file=./pkg/logql/bench/config.yaml -config.expand-env=true
#
@ -83,6 +83,9 @@ querier:
enable: true
batch_size: 8192
query_range:
cache_results: false
dataobj:
storage_bucket_prefix: "dataobj"

Loading…
Cancel
Save