chore(engine): Implement new semantic column naming (#19415)

This PR implements the column naming convention introduced in https://github.com/grafana/loki/pull/19396

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/19028/head^2
Christian Haudum 3 months ago committed by GitHub
parent 50e7cf0bcc
commit 2214a700b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 78
      pkg/engine/compat.go
  2. 182
      pkg/engine/compat_test.go
  3. 24
      pkg/engine/internal/executor/aggregator.go
  4. 61
      pkg/engine/internal/executor/aggregator_test.go
  5. 22
      pkg/engine/internal/executor/dataobjscan.go
  6. 60
      pkg/engine/internal/executor/dataobjscan_test.go
  7. 128
      pkg/engine/internal/executor/expressions.go
  8. 21
      pkg/engine/internal/executor/expressions_test.go
  9. 62
      pkg/engine/internal/executor/filter_test.go
  10. 62
      pkg/engine/internal/executor/parse.go
  11. 500
      pkg/engine/internal/executor/parse_test.go
  12. 2
      pkg/engine/internal/executor/pipeline_utils_test.go
  13. 5
      pkg/engine/internal/executor/project.go
  14. 23
      pkg/engine/internal/executor/project_test.go
  15. 144
      pkg/engine/internal/executor/range_aggregation_test.go
  16. 4
      pkg/engine/internal/executor/schema.go
  17. 34
      pkg/engine/internal/executor/stream_injector.go
  18. 16
      pkg/engine/internal/executor/stream_injector_test.go
  19. 40
      pkg/engine/internal/executor/topk.go
  20. 51
      pkg/engine/internal/executor/topk_test.go
  21. 29
      pkg/engine/internal/executor/util.go
  22. 8
      pkg/engine/internal/executor/util_test.go
  23. 9
      pkg/engine/internal/executor/vector_aggregate_test.go
  24. 2
      pkg/engine/internal/semconv/arrow.go
  25. 89
      pkg/engine/internal/semconv/identifier.go
  26. 14
      pkg/engine/internal/semconv/identifier_test.go
  27. 16
      pkg/engine/internal/types/util.go

@ -9,6 +9,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
@ -73,68 +74,70 @@ func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Label
for colIdx := range int(rec.NumCols()) {
col := rec.Column(colIdx)
colName := rec.ColumnName(colIdx)
// Ignore column values that are NULL or invalid
if col.IsNull(i) || !col.IsValid(i) {
continue
}
// TODO(chaudum): We need to add metadata to columns to identify builtins, labels, metadata, and parsed.
field := rec.Schema().Field(colIdx)
colType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
// Ignore column values that are NULL or invalid or don't have a column typ
if col.IsNull(i) || !col.IsValid(i) || !ok {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
continue
}
shortName := ident.ShortName()
// Extract line
if colName == types.ColumnNameBuiltinMessage && colType == types.ColumnTypeBuiltin.String() {
if ident.Equal(semconv.ColumnIdentMessage) {
entry.Line = col.(*array.String).Value(i)
continue
}
// Extract timestamp
if colName == types.ColumnNameBuiltinTimestamp && colType == types.ColumnTypeBuiltin.String() {
if ident.Equal(semconv.ColumnIdentTimestamp) {
entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i)))
continue
}
// Extract label
if colType == types.ColumnTypeLabel.String() {
if ident.ColumnType() == types.ColumnTypeLabel {
switch arr := col.(type) {
case *array.String:
lbs.Set(colName, arr.Value(i))
lbs.Set(shortName, arr.Value(i))
}
continue
}
// Extract metadata
if colType == types.ColumnTypeMetadata.String() {
if ident.ColumnType() == types.ColumnTypeMetadata {
switch arr := col.(type) {
case *array.String:
metadata.Set(colName, arr.Value(i))
metadata.Set(shortName, arr.Value(i))
// include structured metadata in stream labels
lbs.Set(colName, arr.Value(i))
lbs.Set(shortName, arr.Value(i))
}
continue
}
// Extract parsed
if colType == types.ColumnTypeParsed.String() {
if ident.ColumnType() == types.ColumnTypeParsed {
switch arr := col.(type) {
case *array.String:
// TODO: keep errors if --strict is set
// These are reserved column names used to track parsing errors. We are dropping them until
// we add support for --strict parsing.
if colName == types.ColumnNameParsedError || colName == types.ColumnNameParsedErrorDetails {
if shortName == types.ColumnNameParsedError || shortName == types.ColumnNameParsedErrorDetails {
continue
}
if parsed.Get(colName) != "" {
if parsed.Get(shortName) != "" {
continue
}
parsed.Set(colName, arr.Value(i))
lbs.Set(colName, arr.Value(i))
if metadata.Get(colName) != "" {
metadata.Del(colName)
parsed.Set(shortName, arr.Value(i))
lbs.Set(shortName, arr.Value(i))
if metadata.Get(shortName) != "" {
metadata.Del(shortName)
}
}
}
@ -282,37 +285,38 @@ func collectSamplesFromRow(builder *labels.Builder, rec arrow.Record, i int) (pr
// TODO: we add a lot of overhead by reading row by row. Switch to vectorized conversion.
for colIdx := range int(rec.NumCols()) {
col := rec.Column(colIdx)
colName := rec.ColumnName(colIdx)
// Ignore column values that are NULL or invalid
if col.IsNull(i) || !col.IsValid(i) {
return promql.Sample{}, false
}
field := rec.Schema().Field(colIdx)
colDataType, ok := field.Metadata.GetValue(types.MetadataKeyColumnDataType)
if !ok {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
return promql.Sample{}, false
}
switch colName {
case types.ColumnNameBuiltinTimestamp:
if col.IsNull(i) {
return promql.Sample{}, false
}
shortName := ident.ShortName()
// Extract timestamp
if ident.Equal(semconv.ColumnIdentTimestamp) {
// [promql.Sample] expects milliseconds as timestamp unit
sample.T = int64(col.(*array.Timestamp).Value(i) / 1e6)
case types.ColumnNameGeneratedValue:
if col.IsNull(i) {
return promql.Sample{}, false
}
continue
}
if ident.Equal(semconv.ColumnIdentValue) {
col, ok := col.(*array.Float64)
if !ok {
return promql.Sample{}, false
}
sample.F = col.Value(i)
default:
// allow any string columns
if colDataType == types.Loki.String.String() {
builder.Set(colName, col.(*array.String).Value(i))
}
continue
}
// allow any string columns
if ident.DataType() == types.Loki.String {
builder.Set(shortName, col.(*array.String).Value(i))
}
}

@ -8,9 +8,11 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
@ -18,8 +20,6 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/pkg/push"
)
@ -27,9 +27,6 @@ func TestStreamsResultBuilder(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
mdTypeLabel := types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)
mdTypeMetadata := types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newStreamsResultBuilder()
md, _ := metadata.NewContext(t.Context())
@ -37,47 +34,34 @@ func TestStreamsResultBuilder(t *testing.T) {
})
t.Run("rows without log line, timestamp, or labels are ignored", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadataBuiltinMessage},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
},
nil,
)
rows := arrowtest.Rows{
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000001).UTC(), types.ColumnNameBuiltinMessage: nil, "env": "prod"},
{types.ColumnNameBuiltinTimestamp: nil, types.ColumnNameBuiltinMessage: "log line", "env": "prod"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000003).UTC(), types.ColumnNameBuiltinMessage: "log line", "env": nil},
}
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeMetadata, types.Loki.String)
record := rows.Record(alloc, schema)
defer record.Release()
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newStreamsResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.Len(), "expected no entries to be collected")
})
t.Run("fields without metadata are ignored", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
{Name: "env", Type: arrow.BinaryTypes.String},
semconv.FieldFromIdent(colTs, true), // in practice timestamp is not nullable!
semconv.FieldFromIdent(colMsg, true), // in practice message is not nullable!
semconv.FieldFromIdent(colEnv, true),
},
nil,
)
rows := arrowtest.Rows{
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000001).UTC(), types.ColumnNameBuiltinMessage: "log line 1", "env": "prod"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000002).UTC(), types.ColumnNameBuiltinMessage: "log line 2", "env": "prod"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000003).UTC(), types.ColumnNameBuiltinMessage: "log line 3", "env": "prod"},
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): nil,
colEnv.FQN(): "prod",
},
{
colTs.FQN(): nil,
colMsg.FQN(): "log line",
colEnv.FQN(): "prod",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): nil,
},
}
record := rows.Record(alloc, schema)
@ -94,23 +78,58 @@ func TestStreamsResultBuilder(t *testing.T) {
})
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colNs := semconv.NewIdentifier("namespace", types.ColumnTypeLabel, types.Loki.String)
colTid := semconv.NewIdentifier("traceID", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadataBuiltinMessage},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
{Name: "traceID", Type: arrow.BinaryTypes.String, Metadata: mdTypeMetadata},
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, true),
semconv.FieldFromIdent(colNs, true),
semconv.FieldFromIdent(colTid, true),
},
nil,
)
rows := arrowtest.Rows{
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000001).UTC(), types.ColumnNameBuiltinMessage: "log line 1", "env": "dev", "namespace": "loki-dev-001", "traceID": "860e403fcf754312"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000002).UTC(), types.ColumnNameBuiltinMessage: "log line 2", "env": "prod", "namespace": "loki-prod-001", "traceID": "46ce02549441e41c"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000003).UTC(), types.ColumnNameBuiltinMessage: "log line 3", "env": "dev", "namespace": "loki-dev-002", "traceID": "61330481e1e59b18"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000004).UTC(), types.ColumnNameBuiltinMessage: "log line 4", "env": "prod", "namespace": "loki-prod-001", "traceID": "40e50221e284b9d2"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000005).UTC(), types.ColumnNameBuiltinMessage: "log line 5", "env": "dev", "namespace": "loki-dev-002", "traceID": "0cf883f112ad239b"},
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): "log line 1",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-001",
colTid.FQN(): "860e403fcf754312",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
colMsg.FQN(): "log line 2",
colEnv.FQN(): "prod",
colNs.FQN(): "loki-prod-001",
colTid.FQN(): "46ce02549441e41c",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line 3",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-002",
colTid.FQN(): "61330481e1e59b18",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(),
colMsg.FQN(): "log line 4",
colEnv.FQN(): "prod",
colNs.FQN(): "loki-prod-001",
colTid.FQN(): "40e50221e284b9d2",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000005).UTC(),
colMsg.FQN(): "log line 5",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-002",
colTid.FQN(): "0cf883f112ad239b",
},
}
record := rows.Record(alloc, schema)
@ -166,7 +185,6 @@ func TestStreamsResultBuilder(t *testing.T) {
}
func TestVectorResultBuilder(t *testing.T) {
mdTypeString := types.ColumnMetadata(types.ColumnTypeAmbiguous, types.Loki.String)
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
@ -177,20 +195,24 @@ func TestVectorResultBuilder(t *testing.T) {
})
t.Run("successful conversion of vector data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Float64, Metadata: types.ColumnMetadata(types.ColumnTypeGenerated, types.Loki.Float)},
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
{Name: "job", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
semconv.FieldFromIdent(colJob, false),
},
nil,
)
rows := arrowtest.Rows{
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(42), "instance": "localhost:9090", "job": "prometheus"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(23), "instance": "localhost:9100", "job": "node-exporter"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(15), "instance": "localhost:9100", "job": "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(15), colInst.FQN(): "localhost:9100", colJob.FQN(): "prometheus"},
}
record := rows.Record(alloc, schema)
@ -232,18 +254,21 @@ func TestVectorResultBuilder(t *testing.T) {
// TODO:(ashwanth) also enforce grouping labels are all present?
t.Run("rows without timestamp or value are ignored", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Float64, Metadata: types.ColumnMetadata(types.ColumnTypeGenerated, types.Loki.Float)},
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
},
nil,
)
rows := arrowtest.Rows{
{types.ColumnNameBuiltinTimestamp: nil, types.ColumnNameGeneratedValue: float64(42), "instance": "localhost:9090"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: nil, "instance": "localhost:9100"},
{colTs.FQN(): nil, colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): nil, colInst.FQN(): "localhost:9100"},
}
record := rows.Record(alloc, schema)
@ -261,7 +286,6 @@ func TestVectorResultBuilder(t *testing.T) {
}
func TestMatrixResultBuilder(t *testing.T) {
mdTypeString := types.ColumnMetadata(types.ColumnTypeAmbiguous, types.Loki.String)
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
@ -272,23 +296,27 @@ func TestMatrixResultBuilder(t *testing.T) {
})
t.Run("successful conversion of matrix data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Float64, Metadata: types.ColumnMetadata(types.ColumnTypeGenerated, types.Loki.Float)},
{Name: "instance", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
{Name: "job", Type: arrow.BinaryTypes.String, Metadata: mdTypeString},
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
semconv.FieldFromIdent(colJob, false),
},
nil,
)
rows := arrowtest.Rows{
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(42), "instance": "localhost:9090", "job": "prometheus"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000001000000000).UTC(), types.ColumnNameGeneratedValue: float64(43), "instance": "localhost:9090", "job": "prometheus"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000002000000000).UTC(), types.ColumnNameGeneratedValue: float64(44), "instance": "localhost:9090", "job": "prometheus"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000000000000000).UTC(), types.ColumnNameGeneratedValue: float64(23), "instance": "localhost:9100", "job": "node-exporter"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000001000000000).UTC(), types.ColumnNameGeneratedValue: float64(24), "instance": "localhost:9100", "job": "node-exporter"},
{types.ColumnNameBuiltinTimestamp: time.Unix(0, 1620000002000000000).UTC(), types.ColumnNameGeneratedValue: float64(25), "instance": "localhost:9100", "job": "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(43), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(44), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(24), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(25), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
}
record := rows.Record(alloc, schema)

@ -13,6 +13,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -122,18 +123,8 @@ func (a *aggregator) Add(ts time.Time, value float64, labelValues []string) {
func (a *aggregator) BuildRecord() (arrow.Record, error) {
fields := make([]arrow.Field, 0, len(a.groupBy)+2)
fields = append(fields,
arrow.Field{
Name: types.ColumnNameBuiltinTimestamp,
Type: types.Arrow.Timestamp,
Nullable: false,
Metadata: types.ColumnMetadataBuiltinTimestamp,
},
arrow.Field{
Name: types.ColumnNameGeneratedValue,
Type: types.Arrow.Float,
Nullable: false,
Metadata: types.ColumnMetadata(types.ColumnTypeGenerated, types.Loki.Float),
},
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
semconv.FieldFromIdent(semconv.ColumnIdentValue, false),
)
for _, column := range a.groupBy {
@ -141,13 +132,8 @@ func (a *aggregator) BuildRecord() (arrow.Record, error) {
if !ok {
panic(fmt.Sprintf("invalid column expression type %T", column))
}
fields = append(fields, arrow.Field{
Name: colExpr.Ref.Column,
Type: types.Arrow.String,
Nullable: true,
Metadata: types.ColumnMetadata(colExpr.Ref.Type, types.Loki.String),
})
ident := semconv.NewIdentifier(colExpr.Ref.Column, colExpr.Ref.Type, types.Loki.String)
fields = append(fields, semconv.FieldFromIdent(ident, true))
}
schema := arrow.NewSchema(fields, nil)

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
@ -30,6 +31,12 @@ var (
)
func TestAggregator(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp.FQN()
colVal := semconv.ColumnIdentValue.FQN()
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String).FQN()
colSvc := semconv.NewIdentifier("service", types.ColumnTypeLabel, types.Loki.String).FQN()
t.Run("basic SUM aggregation with record building", func(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
@ -59,13 +66,13 @@ func TestAggregator(t *testing.T) {
defer record.Release()
expect := arrowtest.Rows{
{"timestamp": ts1, "value": float64(15), "env": "prod", "service": "app1"},
{"timestamp": ts1, "value": float64(20), "env": "prod", "service": "app2"},
{"timestamp": ts1, "value": float64(30), "env": "dev", "service": "app1"},
{colTs: ts1, colVal: float64(15), colEnv: "prod", colSvc: "app1"},
{colTs: ts1, colVal: float64(20), colEnv: "prod", colSvc: "app2"},
{colTs: ts1, colVal: float64(30), colEnv: "dev", colSvc: "app1"},
{"timestamp": ts2, "value": float64(25), "env": "prod", "service": "app1"},
{"timestamp": ts2, "value": float64(25), "env": "prod", "service": "app2"},
{"timestamp": ts2, "value": float64(35), "env": "dev", "service": "app2"},
{colTs: ts2, colVal: float64(25), colEnv: "prod", colSvc: "app1"},
{colTs: ts2, colVal: float64(25), colEnv: "prod", colSvc: "app2"},
{colTs: ts2, colVal: float64(35), colEnv: "dev", colSvc: "app2"},
}
rows, err := arrowtest.RecordRows(record)
@ -110,17 +117,17 @@ func TestAggregator(t *testing.T) {
defer record.Release()
expect := arrowtest.Rows{
{"timestamp": ts1, "value": float64(3), "env": "prod", "service": "app1"},
{"timestamp": ts1, "value": float64(1), "env": "prod", "service": "app2"},
{"timestamp": ts1, "value": float64(1), "env": "dev", "service": "app1"},
{colTs: ts1, colVal: float64(3), colEnv: "prod", colSvc: "app1"},
{colTs: ts1, colVal: float64(1), colEnv: "prod", colSvc: "app2"},
{colTs: ts1, colVal: float64(1), colEnv: "dev", colSvc: "app1"},
{"timestamp": ts2, "value": float64(1), "env": "prod", "service": "app1"},
{"timestamp": ts2, "value": float64(2), "env": "prod", "service": "app2"},
{"timestamp": ts2, "value": float64(1), "env": "dev", "service": "app2"},
{colTs: ts2, colVal: float64(1), colEnv: "prod", colSvc: "app1"},
{colTs: ts2, colVal: float64(2), colEnv: "prod", colSvc: "app2"},
{colTs: ts2, colVal: float64(1), colEnv: "dev", colSvc: "app2"},
{"timestamp": ts3, "value": float64(1), "env": "prod", "service": "app1"},
{"timestamp": ts3, "value": float64(1), "env": "prod", "service": "app2"},
{"timestamp": ts3, "value": float64(1), "env": "dev", "service": "app2"},
{colTs: ts3, colVal: float64(1), colEnv: "prod", colSvc: "app1"},
{colTs: ts3, colVal: float64(1), colEnv: "prod", colSvc: "app2"},
{colTs: ts3, colVal: float64(1), colEnv: "dev", colSvc: "app2"},
}
rows, err := arrowtest.RecordRows(record)
@ -159,13 +166,13 @@ func TestAggregator(t *testing.T) {
defer record.Release()
expect := arrowtest.Rows{
{"timestamp": ts1, "value": float64(15), "env": "prod", "service": "app1"},
{"timestamp": ts1, "value": float64(20), "env": "prod", "service": "app2"},
{"timestamp": ts1, "value": float64(30), "env": "dev", "service": "app1"},
{colTs: ts1, colVal: float64(15), colEnv: "prod", colSvc: "app1"},
{colTs: ts1, colVal: float64(20), colEnv: "prod", colSvc: "app2"},
{colTs: ts1, colVal: float64(30), colEnv: "dev", colSvc: "app1"},
{"timestamp": ts2, "value": float64(15), "env": "prod", "service": "app1"},
{"timestamp": ts2, "value": float64(50), "env": "prod", "service": "app2"},
{"timestamp": ts2, "value": float64(35), "env": "dev", "service": "app2"},
{colTs: ts2, colVal: float64(15), colEnv: "prod", colSvc: "app1"},
{colTs: ts2, colVal: float64(50), colEnv: "prod", colSvc: "app2"},
{colTs: ts2, colVal: float64(35), colEnv: "dev", colSvc: "app2"},
}
rows, err := arrowtest.RecordRows(record)
@ -204,13 +211,13 @@ func TestAggregator(t *testing.T) {
defer record.Release()
expect := arrowtest.Rows{
{"timestamp": ts1, "value": float64(5), "env": "prod", "service": "app1"},
{"timestamp": ts1, "value": float64(20), "env": "prod", "service": "app2"},
{"timestamp": ts1, "value": float64(30), "env": "dev", "service": "app1"},
{colTs: ts1, colVal: float64(5), colEnv: "prod", colSvc: "app1"},
{colTs: ts1, colVal: float64(20), colEnv: "prod", colSvc: "app2"},
{colTs: ts1, colVal: float64(30), colEnv: "dev", colSvc: "app1"},
{"timestamp": ts2, "value": float64(15), "env": "prod", "service": "app1"},
{"timestamp": ts2, "value": float64(25), "env": "prod", "service": "app2"},
{"timestamp": ts2, "value": float64(35), "env": "dev", "service": "app2"},
{colTs: ts2, colVal: float64(15), colEnv: "prod", colSvc: "app1"},
{colTs: ts2, colVal: float64(25), colEnv: "prod", colSvc: "app2"},
{colTs: ts2, colVal: float64(35), colEnv: "dev", colSvc: "app2"},
}
rows, err := arrowtest.RecordRows(record)

@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -255,28 +256,13 @@ func makeScalars[S ~[]E, E any](s S) []scalar.Scalar {
func logsColumnToEngineField(col *logs.Column) (arrow.Field, error) {
switch col.Type {
case logs.ColumnTypeTimestamp:
return arrow.Field{
Name: types.ColumnNameBuiltinTimestamp,
Type: types.Arrow.Timestamp,
Nullable: true,
Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Timestamp),
}, nil
return semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, true), nil
case logs.ColumnTypeMessage:
return arrow.Field{
Name: types.ColumnNameBuiltinMessage,
Type: types.Arrow.String,
Nullable: true,
Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String),
}, nil
return semconv.FieldFromIdent(semconv.ColumnIdentMessage, true), nil
case logs.ColumnTypeMetadata:
return arrow.Field{
Name: col.Name,
Type: types.Arrow.String,
Nullable: true,
Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String),
}, nil
return semconv.FieldFromIdent(semconv.NewIdentifier(col.Name, types.ColumnTypeMetadata, types.Loki.String), true), nil
}
return arrow.Field{}, fmt.Errorf("unsupported logs column type %s", col.Type)

@ -14,17 +14,13 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/pkg/push"
)
var (
labelMD = types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)
metadataMD = types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)
)
func Test_dataobjScan(t *testing.T) {
obj := buildDataobj(t, []logproto.Stream{
{
@ -89,12 +85,12 @@ func Test_dataobjScan(t *testing.T) {
}, log.NewNopLogger())
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "guid", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp, Nullable: true},
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadataBuiltinMessage, Nullable: true},
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("utf8.label.service", true),
semconv.FieldFromFQN("utf8.metadata.guid", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be nullable=false?
semconv.FieldFromFQN("utf8.builtin.message", true), // should be nullable=false?
}
expectCSV := `prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world
@ -123,8 +119,8 @@ prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world`
}, log.NewNopLogger())
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp, Nullable: true},
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be not nullable?
}
expectCSV := `prod,1970-01-01 00:00:10
@ -150,12 +146,12 @@ prod,1970-01-01 00:00:02`
}, log.NewNopLogger())
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "guid", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp, Nullable: true},
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadataBuiltinMessage, Nullable: true},
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("utf8.label.service", true),
semconv.FieldFromFQN("utf8.metadata.guid", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be nullable=false?
semconv.FieldFromFQN("utf8.builtin.message", true), // should be nullable=false?
}
expectCSV := `prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world
@ -182,7 +178,7 @@ prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world`
}, log.NewNopLogger())
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
semconv.FieldFromFQN("utf8.label.env", true),
}
expectCSV := `prod
@ -262,16 +258,16 @@ func Test_dataobjScan_DuplicateColumns(t *testing.T) {
}, log.NewNopLogger())
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("utf8.label.namespace", true),
semconv.FieldFromFQN("utf8.label.pod", true),
semconv.FieldFromFQN("utf8.label.service", true),
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
semconv.FieldFromFQN("utf8.metadata.namespace", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: types.ColumnMetadataBuiltinTimestamp, Nullable: true},
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadataBuiltinMessage, Nullable: true},
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true),
semconv.FieldFromFQN("utf8.builtin.message", true),
}
expectCSV := `prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3
@ -297,8 +293,8 @@ prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1`
}, log.NewNopLogger())
expectFields := []arrow.Field{
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
semconv.FieldFromFQN("utf8.label.pod", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
}
expectCSV := `NULL,NULL
@ -324,8 +320,8 @@ pod-1,override`
}, log.NewNopLogger())
expectFields := []arrow.Field{
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
semconv.FieldFromFQN("utf8.label.namespace", true),
semconv.FieldFromFQN("utf8.metadata.namespace", true),
}
expectCSV := `namespace-2,NULL

@ -9,6 +9,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -25,75 +26,79 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
}, nil
case *physical.ColumnExpr:
fieldIndices := input.Schema().FieldIndices(expr.Ref.Column)
if len(fieldIndices) > 0 {
// For non-ambiguous look-ups, look for an exact match
if expr.Ref.Type != types.ColumnTypeAmbiguous {
for _, idx := range fieldIndices {
field := input.Schema().Field(idx)
dt, ok := field.Metadata.GetValue(types.MetadataKeyColumnDataType)
if !ok {
continue
}
ct, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
if !ok || ct != expr.Ref.Type.String() {
continue
}
col := input.Column(idx)
col.Retain()
colIdent := semconv.NewIdentifier(expr.Ref.Column, expr.Ref.Type, types.Loki.String)
// For non-ambiguous columns, we can look up the column in the schema by its fully qualified name.
if expr.Ref.Type != types.ColumnTypeAmbiguous {
for idx, field := range input.Schema().Fields() {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
return nil, fmt.Errorf("failed to parse column %s: %w", field.Name, err)
}
if ident.ShortName() == colIdent.ShortName() && ident.ColumnType() == colIdent.ColumnType() {
arr := input.Column(idx)
arr.Retain()
return &Array{
array: col,
dt: types.MustFromString(dt),
ct: types.ColumnTypeFromString(ct),
array: arr,
dt: ident.DataType(),
ct: ident.ColumnType(),
rows: input.NumRows(),
}, nil
}
} else {
// For ambiguous columns, collect all matching columns and order by precedence
var vecs []ColumnVector
for _, idx := range fieldIndices {
field := input.Schema().Field(idx)
dt, ok := field.Metadata.GetValue(types.MetadataKeyColumnDataType)
if !ok {
continue
}
ct, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
if !ok {
continue
}
// TODO(ashwanth): Support other data types in CoalesceVector.
// For now, ensure all vectors are strings to avoid type conflicts.
if types.Loki.String.String() != dt {
return nil, fmt.Errorf("column %s has datatype %s, but expression expects string", expr.Ref.Column, dt)
}
col := input.Column(idx)
col.Retain()
vecs = append(vecs, &Array{
array: col,
dt: types.MustFromString(dt),
ct: types.ColumnTypeFromString(ct),
rows: input.NumRows(),
})
}
}
// For ambiguous columns, we need to filter on the name and type and combine matching columns into a CoalesceVector.
if expr.Ref.Type == types.ColumnTypeAmbiguous {
var fieldIndices []int
var fieldIdents []*semconv.Identifier
for idx, field := range input.Schema().Fields() {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
return nil, fmt.Errorf("failed to parse column %s: %w", field.Name, err)
}
if ident.ShortName() == colIdent.ShortName() {
fieldIndices = append(fieldIndices, idx)
fieldIdents = append(fieldIdents, ident)
}
}
if len(vecs) > 1 {
// Multiple matches - sort by precedence and create CoalesceVector
slices.SortFunc(vecs, func(a, b ColumnVector) int {
return types.ColumnTypePrecedence(a.ColumnType()) - types.ColumnTypePrecedence(b.ColumnType())
})
// Collect all matching columns and order by precedence
var vecs []ColumnVector
for i := range fieldIndices {
idx := fieldIndices[i]
ident := fieldIdents[i]
return &CoalesceVector{
vectors: vecs,
rows: input.NumRows(),
}, nil
} else if len(vecs) == 1 {
return vecs[0], nil
// TODO(ashwanth): Support other data types in CoalesceVector.
// For now, ensure all vectors are strings to avoid type conflicts.
if ident.DataType() != types.Loki.String {
return nil, fmt.Errorf("column %s has datatype %s, but expression expects %s", ident.ShortName(), ident.DataType(), types.Loki.String)
}
arr := input.Column(idx)
arr.Retain()
vecs = append(vecs, &Array{
array: arr,
dt: ident.DataType(),
ct: ident.ColumnType(),
rows: input.NumRows(),
})
}
if len(vecs) == 1 {
return vecs[0], nil
}
if len(vecs) > 1 {
// Multiple matches - sort by precedence and create CoalesceVector
slices.SortFunc(vecs, func(a, b ColumnVector) int {
return types.ColumnTypePrecedence(a.ColumnType()) - types.ColumnTypePrecedence(b.ColumnType())
})
return &CoalesceVector{
vectors: vecs,
rows: input.NumRows(),
}, nil
}
}
@ -125,6 +130,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
return nil, err
}
defer lhs.Release()
rhs, err := e.eval(expr.Right, input)
if err != nil {
return nil, err

@ -10,15 +10,16 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
var (
fields = []arrow.Field{
{Name: "name", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
{Name: "timestamp", Type: types.Arrow.Timestamp, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Timestamp)},
{Name: "value", Type: types.Arrow.Float, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Float)},
{Name: "valid", Type: types.Arrow.Bool, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Bool)},
semconv.FieldFromFQN("utf8.builtin.name", false),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false),
semconv.FieldFromFQN("float64.builtin.value", false),
semconv.FieldFromFQN("bool.builtin.valid", false),
}
sampledata = `Alice,1745487598764058205,0.2586284611568047,false
Bob,1745487598764058305,0.7823145698741236,true
@ -230,8 +231,8 @@ func batch(n int, now time.Time) arrow.Record {
// 2. Define the schema
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "message", Type: types.Arrow.String, Metadata: types.ColumnMetadataBuiltinMessage},
{Name: "timestamp", Type: types.Arrow.Timestamp, Metadata: types.ColumnMetadataBuiltinTimestamp},
semconv.FieldFromIdent(semconv.ColumnIdentMessage, false),
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
},
nil, // No metadata
)
@ -272,9 +273,9 @@ func batch(n int, now time.Time) arrow.Record {
func TestEvaluateAmbiguousColumnExpression(t *testing.T) {
// Test precedence between generated, metadata, and label columns
fields := []arrow.Field{
{Name: "test", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)},
{Name: "test", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)},
{Name: "test", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeGenerated, types.Loki.String)},
semconv.FieldFromFQN("utf8.label.test", true),
semconv.FieldFromFQN("utf8.metadata.test", true),
semconv.FieldFromFQN("utf8.generated.test", true),
}
// CSV data where:
@ -340,7 +341,7 @@ null,null,null`
t.Run("look-up matching single column should return Array", func(t *testing.T) {
// Create a record with only one column type
fields := []arrow.Field{
{Name: "single", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)},
semconv.FieldFromFQN("utf8.label.single", false),
}
data := `label_0
label_1

@ -8,14 +8,18 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
func TestNewFilterPipeline(t *testing.T) {
colName := "utf8.builtin.name"
colValid := "bool.builtin.valid"
fields := []arrow.Field{
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
{Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Bool)},
semconv.FieldFromFQN(colName, true),
semconv.FieldFromFQN(colValid, true),
}
t.Run("filter with true literal predicate", func(t *testing.T) {
@ -27,9 +31,9 @@ func TestNewFilterPipeline(t *testing.T) {
// Create input data using arrowtest.Rows
inputRows := []arrowtest.Rows{
{
{"name": "Alice", "valid": true},
{"name": "Bob", "valid": false},
{"name": "Charlie", "valid": true},
{colName: "Alice", colValid: true},
{colName: "Bob", colValid: false},
{colName: "Charlie", colValid: true},
},
}
input := NewArrowtestPipeline(alloc, schema, inputRows...)
@ -66,9 +70,9 @@ func TestNewFilterPipeline(t *testing.T) {
// Create input data using arrowtest.Rows
inputRows := []arrowtest.Rows{
{
{"name": "Alice", "valid": true},
{"name": "Bob", "valid": false},
{"name": "Charlie", "valid": true},
{colName: "Alice", colValid: true},
{colName: "Bob", colValid: false},
{colName: "Charlie", colValid: true},
},
}
@ -104,9 +108,9 @@ func TestNewFilterPipeline(t *testing.T) {
// Create input data using arrowtest.Rows
inputRows := []arrowtest.Rows{
{
{"name": "Alice", "valid": true},
{"name": "Bob", "valid": false},
{"name": "Charlie", "valid": true},
{colName: "Alice", colValid: true},
{colName: "Bob", colValid: false},
{colName: "Charlie", colValid: true},
},
}
input := NewArrowtestPipeline(alloc, schema, inputRows...)
@ -128,8 +132,8 @@ func TestNewFilterPipeline(t *testing.T) {
// Create expected output (only rows where valid=true)
expectedRows := arrowtest.Rows{
{"name": "Alice", "valid": true},
{"name": "Charlie", "valid": true},
{colName: "Alice", colValid: true},
{colName: "Charlie", colValid: true},
}
// Read the pipeline output
@ -152,10 +156,10 @@ func TestNewFilterPipeline(t *testing.T) {
// Create input data using arrowtest.Rows
inputRows := []arrowtest.Rows{
{
{"name": "Alice", "valid": true},
{"name": "Bob", "valid": false},
{"name": "Bob", "valid": true},
{"name": "Charlie", "valid": false},
{colName: "Alice", colValid: true},
{colName: "Bob", colValid: false},
{colName: "Bob", colValid: true},
{colName: "Charlie", colValid: false},
},
}
input := NewArrowtestPipeline(alloc, schema, inputRows...)
@ -183,7 +187,7 @@ func TestNewFilterPipeline(t *testing.T) {
// Create expected output (only rows where name=="Bob" AND valid!=false)
expectedRows := arrowtest.Rows{
{"name": "Bob", "valid": true},
{colName: "Bob", colValid: true},
}
// Read the pipeline output
@ -241,12 +245,12 @@ func TestNewFilterPipeline(t *testing.T) {
// Create input data split across multiple batches using arrowtest.Rows
inputRows := []arrowtest.Rows{
{
{"name": "Alice", "valid": true},
{"name": "Bob", "valid": false},
{colName: "Alice", colValid: true},
{colName: "Bob", colValid: false},
},
{
{"name": "Charlie", "valid": true},
{"name": "Dave", "valid": false},
{colName: "Charlie", colValid: true},
{colName: "Dave", colValid: false},
},
}
input := NewArrowtestPipeline(alloc, schema, inputRows...)
@ -268,8 +272,8 @@ func TestNewFilterPipeline(t *testing.T) {
// Create expected output (only rows where valid=true)
expectedRows := arrowtest.Rows{
{"name": "Alice", "valid": true},
{"name": "Charlie", "valid": true},
{colName: "Alice", colValid: true},
{colName: "Charlie", colValid: true},
}
// Read the pipeline output
@ -304,9 +308,9 @@ func TestNewFilterPipeline(t *testing.T) {
// Create input data with null values
inputRows := []arrowtest.Rows{
{
{"name": "Alice", "valid": true},
{"name": nil, "valid": true}, // null name
{"name": "Bob", "valid": false},
{colName: "Alice", colValid: true},
{colName: nil, colValid: true}, // null name
{colName: "Bob", colValid: false},
},
}
input := NewArrowtestPipeline(alloc, schema, inputRows...)
@ -328,8 +332,8 @@ func TestNewFilterPipeline(t *testing.T) {
// Create expected output (only rows where valid=true, including null name)
expectedRows := arrowtest.Rows{
{"name": "Alice", "valid": true},
{"name": nil, "valid": true}, // null name should be retained
{colName: "Alice", colValid: true},
{colName: nil, colValid: true}, // null name should be retained
}
// Read the pipeline output

@ -11,6 +11,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -28,22 +29,14 @@ func NewParsePipeline(parse *physical.ParseNode, input Pipeline, allocator memor
defer batch.Release()
// Find the message column
messageColIdx := -1
schema := batch.Schema()
for i := 0; i < schema.NumFields(); i++ {
if schema.Field(i).Name == types.ColumnNameBuiltinMessage {
messageColIdx = i
break
}
}
if messageColIdx == -1 {
return failureState(fmt.Errorf("message column not found"))
msgCol, msgIdx, err := columnForIdent(semconv.ColumnIdentMessage, batch)
if err != nil {
return failureState(err)
}
messageCol := batch.Column(messageColIdx)
stringCol, ok := messageCol.(*array.String)
stringCol, ok := msgCol.(*array.String)
if !ok {
return failureState(fmt.Errorf("message column is not a string column"))
return failureState(fmt.Errorf("column %s must be of type utf8, got %s", semconv.ColumnIdentMessage.FQN(), batch.Schema().Field(msgIdx)))
}
var headers []string
@ -58,23 +51,18 @@ func NewParsePipeline(parse *physical.ParseNode, input Pipeline, allocator memor
}
// Build new schema with original fields plus parsed fields
schema := batch.Schema()
newFields := make([]arrow.Field, 0, schema.NumFields()+len(headers))
for i := 0; i < schema.NumFields(); i++ {
newFields = append(newFields, schema.Field(i))
}
for _, header := range headers {
// Add metadata to mark these as parsed columns
metadata := types.ColumnMetadata(
types.ColumnTypeParsed,
types.Loki.String,
)
newFields = append(newFields, arrow.Field{
Name: header,
Type: arrow.BinaryTypes.String,
Metadata: metadata,
Nullable: true,
})
ct := types.ColumnTypeParsed
if header == semconv.ColumnIdentError.ShortName() || header == semconv.ColumnIdentErrorDetails.ShortName() {
ct = types.ColumnTypeGenerated
}
ident := semconv.NewIdentifier(header, ct, types.Loki.String)
newFields = append(newFields, semconv.FieldFromIdent(ident, true))
}
newSchema := arrow.NewSchema(newFields, nil)
@ -151,9 +139,13 @@ func parseLines(input *array.String, requestedKeys []string, columnBuilders map[
if !hasErrorColumns {
errorBuilder = array.NewStringBuilder(allocator)
errorDetailsBuilder = array.NewStringBuilder(allocator)
columnBuilders[types.ColumnNameParsedError] = errorBuilder
columnBuilders[types.ColumnNameParsedErrorDetails] = errorDetailsBuilder
columnOrder = append(columnOrder, types.ColumnNameParsedError, types.ColumnNameParsedErrorDetails)
columnBuilders[semconv.ColumnIdentError.ShortName()] = errorBuilder
columnBuilders[semconv.ColumnIdentErrorDetails.ShortName()] = errorDetailsBuilder
columnOrder = append(
columnOrder,
semconv.ColumnIdentError.ShortName(),
semconv.ColumnIdentErrorDetails.ShortName(),
)
hasErrorColumns = true
// Backfill NULLs for previous rows
@ -175,17 +167,17 @@ func parseLines(input *array.String, requestedKeys []string, columnBuilders map[
}
// Track which keys we've seen this row
seenKeys := make(map[string]bool)
seenKeys := make(map[string]struct{})
if hasErrorColumns {
// Mark error columns as seen so we don't append nulls for them
seenKeys[types.ColumnNameParsedError] = true
seenKeys[types.ColumnNameParsedErrorDetails] = true
seenKeys[semconv.ColumnIdentError.ShortName()] = struct{}{}
seenKeys[semconv.ColumnIdentErrorDetails.ShortName()] = struct{}{}
}
// Add values for parsed keys (only if no error)
if err == nil {
for key, value := range parsed {
seenKeys[key] = true
seenKeys[key] = struct{}{}
builder, exists := columnBuilders[key]
if !exists {
// New column discovered - create and backfill
@ -203,7 +195,7 @@ func parseLines(input *array.String, requestedKeys []string, columnBuilders map[
// Append NULLs for columns not in this row
for _, key := range columnOrder {
if !seenKeys[key] {
if _, found := seenKeys[key]; !found {
columnBuilders[key].AppendNull()
}
}
@ -214,12 +206,12 @@ func parseLines(input *array.String, requestedKeys []string, columnBuilders map[
// Keep error columns at the end, sort the rest
nonErrorColumns := make([]string, 0, len(columnOrder)-2)
for _, key := range columnOrder {
if key != types.ColumnNameParsedError && key != types.ColumnNameParsedErrorDetails {
if key != semconv.ColumnIdentError.ShortName() && key != semconv.ColumnIdentErrorDetails.ShortName() {
nonErrorColumns = append(nonErrorColumns, key)
}
}
sort.Strings(nonErrorColumns)
columnOrder = append(nonErrorColumns, types.ColumnNameParsedError, types.ColumnNameParsedErrorDetails)
columnOrder = append(nonErrorColumns, semconv.ColumnIdentError.ShortName(), semconv.ColumnIdentErrorDetails.ShortName())
} else {
sort.Strings(columnOrder)
}

@ -9,11 +9,17 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
func TestNewParsePipeline_logfmt(t *testing.T) {
var (
colTs = "timestamp_ns.builtin.timestamp"
colMsg = "utf8.builtin.message"
)
for _, tt := range []struct {
name string
schema *arrow.Schema
@ -25,111 +31,211 @@ func TestNewParsePipeline_logfmt(t *testing.T) {
{
name: "parse stage transforms records, adding columns parsed from message",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": "level=error status=500"},
{"message": "level=info status=200"},
{"message": "level=debug status=201"},
{colMsg: "level=error status=500"},
{colMsg: "level=info status=200"},
{colMsg: "level=debug status=201"},
},
requestedKeys: []string{"level", "status"},
expectedFields: 3, // 3 columns: message, level, status
expectedOutput: arrowtest.Rows{
{"message": "level=error status=500", "level": "error", "status": "500"},
{"message": "level=info status=200", "level": "info", "status": "200"},
{"message": "level=debug status=201", "level": "debug", "status": "201"},
{
colMsg: "level=error status=500",
"utf8.parsed.level": "error",
"utf8.parsed.status": "500",
},
{
colMsg: "level=info status=200",
"utf8.parsed.level": "info",
"utf8.parsed.status": "200",
},
{
colMsg: "level=debug status=201",
"utf8.parsed.level": "debug",
"utf8.parsed.status": "201",
},
},
},
{
name: "parse stage preserves existing columns",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
{Name: "app", Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true),
semconv.FieldFromFQN("utf8.builtin.message", true),
semconv.FieldFromFQN("utf8.label.app", true),
}, nil),
input: arrowtest.Rows{
{"timestamp": time.Unix(1, 0).UTC(), "message": "level=error status=500", "app": "frontend"},
{"timestamp": time.Unix(2, 0).UTC(), "message": "level=info status=200", "app": "backend"},
{colTs: time.Unix(1, 0).UTC(), colMsg: "level=error status=500", "utf8.label.app": "frontend"},
{colTs: time.Unix(2, 0).UTC(), colMsg: "level=info status=200", "utf8.label.app": "backend"},
},
requestedKeys: []string{"level", "status"},
expectedFields: 5, // 5 columns: timestamp, message, app, level, status
expectedOutput: arrowtest.Rows{
{"timestamp": time.Unix(1, 0).UTC(), "message": "level=error status=500", "app": "frontend", "level": "error", "status": "500"},
{"timestamp": time.Unix(2, 0).UTC(), "message": "level=info status=200", "app": "backend", "level": "info", "status": "200"},
{
colTs: time.Unix(1, 0).UTC(),
colMsg: "level=error status=500",
"utf8.label.app": "frontend",
"utf8.parsed.level": "error",
"utf8.parsed.status": "500",
},
{
colTs: time.Unix(2, 0).UTC(),
colMsg: "level=info status=200",
"utf8.label.app": "backend",
"utf8.parsed.level": "info",
"utf8.parsed.status": "200",
},
},
},
{
name: "handle missing keys with NULL",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": "level=error"},
{"message": "status=200"},
{"message": "level=info"},
{colMsg: "level=error"},
{colMsg: "status=200"},
{colMsg: "level=info"},
},
requestedKeys: []string{"level"},
expectedFields: 2, // 2 columns: message, level
expectedOutput: arrowtest.Rows{
{"message": "level=error", "level": "error"},
{"message": "status=200", "level": nil},
{"message": "level=info", "level": "info"},
{
colMsg: "level=error",
"utf8.parsed.level": "error",
},
{
colMsg: "status=200",
"utf8.parsed.level": nil,
},
{
colMsg: "level=info",
"utf8.parsed.level": "info",
},
},
},
{
name: "handle errors with error columns",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": "level=info status=200"}, // No errors
{"message": "status==value level=error"}, // Double equals error on requested key
{"message": "level=\"unclosed status=500"}, // Unclosed quote error
{colMsg: "level=info status=200"}, // No errors
{colMsg: "status==value level=error"}, // Double equals error on requested key
{colMsg: "level=\"unclosed status=500"}, // Unclosed quote error
},
requestedKeys: []string{"level", "status"},
expectedFields: 5, // 5 columns: message, level, status, __error__, __error_details__
expectedOutput: arrowtest.Rows{
{"message": "level=info status=200", "level": "info", "status": "200", types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{"message": "status==value level=error", "level": nil, "status": nil, types.ColumnNameParsedError: types.LogfmtParserErrorType, types.ColumnNameParsedErrorDetails: "logfmt syntax error at pos 8 : unexpected '='"},
{"message": "level=\"unclosed status=500", "level": nil, "status": nil, types.ColumnNameParsedError: types.LogfmtParserErrorType, types.ColumnNameParsedErrorDetails: "logfmt syntax error at pos 27 : unterminated quoted value"},
{
colMsg: "level=info status=200",
"utf8.parsed.level": "info",
"utf8.parsed.status": "200",
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
{
colMsg: "status==value level=error",
"utf8.parsed.level": nil,
"utf8.parsed.status": nil,
semconv.ColumnIdentError.FQN(): types.LogfmtParserErrorType,
semconv.ColumnIdentErrorDetails.FQN(): "logfmt syntax error at pos 8 : unexpected '='",
},
{
colMsg: "level=\"unclosed status=500",
"utf8.parsed.level": nil,
"utf8.parsed.status": nil,
semconv.ColumnIdentError.FQN(): types.LogfmtParserErrorType,
semconv.ColumnIdentErrorDetails.FQN(): "logfmt syntax error at pos 27 : unterminated quoted value",
},
},
},
{
name: "extract all keys when none requested",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": "level=info status=200 method=GET"},
{"message": "level=warn code=304"},
{"message": "level=error status=500 method=POST duration=123ms"},
{colMsg: "level=info status=200 method=GET"},
{colMsg: "level=warn code=304"},
{colMsg: "level=error status=500 method=POST duration=123ms"},
},
requestedKeys: nil, // nil means extract all keys
expectedFields: 6, // 6 columns: message, code, duration, level, method, status
expectedOutput: arrowtest.Rows{
{"message": "level=info status=200 method=GET", "code": nil, "duration": nil, "level": "info", "method": "GET", "status": "200"},
{"message": "level=warn code=304", "code": "304", "duration": nil, "level": "warn", "method": nil, "status": nil},
{"message": "level=error status=500 method=POST duration=123ms", "code": nil, "duration": "123ms", "level": "error", "method": "POST", "status": "500"},
{
colMsg: "level=info status=200 method=GET",
"utf8.parsed.code": nil,
"utf8.parsed.duration": nil,
"utf8.parsed.level": "info",
"utf8.parsed.method": "GET",
"utf8.parsed.status": "200",
},
{
colMsg: "level=warn code=304",
"utf8.parsed.code": "304",
"utf8.parsed.duration": nil,
"utf8.parsed.level": "warn",
"utf8.parsed.method": nil,
"utf8.parsed.status": nil,
},
{
colMsg: "level=error status=500 method=POST duration=123ms",
"utf8.parsed.code": nil,
"utf8.parsed.duration": "123ms",
"utf8.parsed.level": "error",
"utf8.parsed.method": "POST",
"utf8.parsed.status": "500",
},
},
},
{
name: "extract all keys with errors when none requested",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": "level=info status=200 method=GET"}, // Valid line
{"message": "level==error code=500"}, // Double equals error
{"message": "msg=\"unclosed duration=100ms code=400"}, // Unclosed quote error
{"message": "level=debug method=POST"}, // Valid line
{colMsg: "level=info status=200 method=GET"}, // Valid line
{colMsg: "level==error code=500"}, // Double equals error
{colMsg: "msg=\"unclosed duration=100ms code=400"}, // Unclosed quote error
{colMsg: "level=debug method=POST"}, // Valid line
},
requestedKeys: nil, // nil means extract all keys
expectedFields: 6, // 6 columns: message, level, method, status, __error__, __error_details__
expectedOutput: arrowtest.Rows{
{"message": "level=info status=200 method=GET", "level": "info", "method": "GET", "status": "200", types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{"message": "level==error code=500", "level": nil, "method": nil, "status": nil, types.ColumnNameParsedError: types.LogfmtParserErrorType, types.ColumnNameParsedErrorDetails: "logfmt syntax error at pos 7 : unexpected '='"},
{"message": "msg=\"unclosed duration=100ms code=400", "level": nil, "method": nil, "status": nil, types.ColumnNameParsedError: types.LogfmtParserErrorType, types.ColumnNameParsedErrorDetails: "logfmt syntax error at pos 38 : unterminated quoted value"},
{"message": "level=debug method=POST", "level": "debug", "method": "POST", "status": nil, types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{
colMsg: "level=info status=200 method=GET",
"utf8.parsed.level": "info",
"utf8.parsed.method": "GET",
"utf8.parsed.status": "200",
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
{
colMsg: "level==error code=500",
"utf8.parsed.level": nil,
"utf8.parsed.method": nil,
"utf8.parsed.status": nil,
semconv.ColumnIdentError.FQN(): types.LogfmtParserErrorType,
semconv.ColumnIdentErrorDetails.FQN(): "logfmt syntax error at pos 7 : unexpected '='",
},
{
colMsg: "msg=\"unclosed duration=100ms code=400",
"utf8.parsed.level": nil,
"utf8.parsed.method": nil,
"utf8.parsed.status": nil,
semconv.ColumnIdentError.FQN(): types.LogfmtParserErrorType,
semconv.ColumnIdentErrorDetails.FQN(): "logfmt syntax error at pos 38 : unterminated quoted value",
},
{
colMsg: "level=debug method=POST",
"utf8.parsed.level": "debug",
"utf8.parsed.method": "POST",
"utf8.parsed.status": nil,
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
},
},
} {
@ -171,6 +277,11 @@ func TestNewParsePipeline_logfmt(t *testing.T) {
}
func TestNewParsePipeline_JSON(t *testing.T) {
var (
colTs = "timestamp_ns.builtin.timestamp"
colMsg = "utf8.builtin.message"
)
for _, tt := range []struct {
name string
schema *arrow.Schema
@ -182,181 +293,342 @@ func TestNewParsePipeline_JSON(t *testing.T) {
{
name: "parse stage transforms records, adding columns parsed from message",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"level": "error", "status": "500"}`},
{"message": `{"level": "info", "status": "200"}`},
{"message": `{"level": "debug", "status": "201"}`},
{colMsg: `{"level": "error", "status": "500"}`},
{colMsg: `{"level": "info", "status": "200"}`},
{colMsg: `{"level": "debug", "status": "201"}`},
},
requestedKeys: []string{"level", "status"},
expectedFields: 3, // 3 columns: message, level, status
expectedOutput: arrowtest.Rows{
{"message": `{"level": "error", "status": "500"}`, "level": "error", "status": "500"},
{"message": `{"level": "info", "status": "200"}`, "level": "info", "status": "200"},
{"message": `{"level": "debug", "status": "201"}`, "level": "debug", "status": "201"},
{colMsg: `{"level": "error", "status": "500"}`, "utf8.parsed.level": "error", "utf8.parsed.status": "500"},
{colMsg: `{"level": "info", "status": "200"}`, "utf8.parsed.level": "info", "utf8.parsed.status": "200"},
{colMsg: `{"level": "debug", "status": "201"}`, "utf8.parsed.level": "debug", "utf8.parsed.status": "201"},
},
},
{
name: "parse stage preserves existing columns",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
{Name: "app", Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true),
semconv.FieldFromFQN("utf8.builtin.message", true),
semconv.FieldFromFQN("utf8.label.app", true),
}, nil),
input: arrowtest.Rows{
{"timestamp": time.Unix(1, 0).UTC(), "message": `{"level": "error", "status": "500"}`, "app": "frontend"},
{"timestamp": time.Unix(2, 0).UTC(), "message": `{"level": "info", "status": "200"}`, "app": "backend"},
{colTs: time.Unix(1, 0).UTC(), colMsg: `{"level": "error", "status": "500"}`, "utf8.label.app": "frontend"},
{colTs: time.Unix(2, 0).UTC(), colMsg: `{"level": "info", "status": "200"}`, "utf8.label.app": "backend"},
},
requestedKeys: []string{"level", "status"},
expectedFields: 5, // 5 columns: timestamp, message, app, level, status
expectedOutput: arrowtest.Rows{
{"timestamp": time.Unix(1, 0).UTC(), "message": `{"level": "error", "status": "500"}`, "app": "frontend", "level": "error", "status": "500"},
{"timestamp": time.Unix(2, 0).UTC(), "message": `{"level": "info", "status": "200"}`, "app": "backend", "level": "info", "status": "200"},
{colTs: time.Unix(1, 0).UTC(), colMsg: `{"level": "error", "status": "500"}`, "utf8.label.app": "frontend", "utf8.parsed.level": "error", "utf8.parsed.status": "500"},
{colTs: time.Unix(2, 0).UTC(), colMsg: `{"level": "info", "status": "200"}`, "utf8.label.app": "backend", "utf8.parsed.level": "info", "utf8.parsed.status": "200"},
},
},
{
name: "handle missing keys with NULL",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"level": "error"}`},
{"message": `{"status": "200"}`},
{"message": `{"level": "info"}`},
{colMsg: `{"level": "error"}`},
{colMsg: `{"status": "200"}`},
{colMsg: `{"level": "info"}`},
},
requestedKeys: []string{"level"},
expectedFields: 2, // 2 columns: message, level
expectedOutput: arrowtest.Rows{
{"message": `{"level": "error"}`, "level": "error"},
{"message": `{"status": "200"}`, "level": nil},
{"message": `{"level": "info"}`, "level": "info"},
{colMsg: `{"level": "error"}`, "utf8.parsed.level": "error"},
{colMsg: `{"status": "200"}`, "utf8.parsed.level": nil},
{colMsg: `{"level": "info"}`, "utf8.parsed.level": "info"},
},
},
{
name: "handle errors with error columns",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"level": "info", "status": "200"}`}, // No errors
{"message": `{"level": "error", "status":`}, // Missing closing brace and value
{"message": `{"level": "info", "status": 200}`}, // Number should be converted to string
{colMsg: `{"level": "info", "status": "200"}`}, // No errors
{colMsg: `{"level": "error", "status":`}, // Missing closing brace and value
{colMsg: `{"level": "info", "status": 200}`}, // Number should be converted to string
},
requestedKeys: []string{"level", "status"},
expectedFields: 5, // 5 columns: message, level, status, __error__, __error_details__ (due to malformed JSON)
expectedOutput: arrowtest.Rows{
{"message": `{"level": "info", "status": "200"}`, "level": "info", "status": "200", types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{"message": `{"level": "error", "status":`, "level": nil, "status": nil, types.ColumnNameParsedError: "JSONParserErr", types.ColumnNameParsedErrorDetails: "Malformed JSON error"},
{"message": `{"level": "info", "status": 200}`, "level": "info", "status": "200", types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{
colMsg: `{"level": "info", "status": "200"}`,
"utf8.parsed.level": "info",
"utf8.parsed.status": "200",
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
{
colMsg: `{"level": "error", "status":`,
"utf8.parsed.level": nil,
"utf8.parsed.status": nil,
semconv.ColumnIdentError.FQN(): "JSONParserErr",
semconv.ColumnIdentErrorDetails.FQN(): "Malformed JSON error",
},
{
colMsg: `{"level": "info", "status": 200}`,
"utf8.parsed.level": "info",
"utf8.parsed.status": "200",
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
},
},
{
name: "extract all keys when none requested",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"level": "info", "status": "200", "method": "GET"}`},
{"message": `{"level": "warn", "code": "304"}`},
{"message": `{"level": "error", "status": "500", "method": "POST", "duration": "123ms"}`},
{colMsg: `{"level": "info", "status": "200", "method": "GET"}`},
{colMsg: `{"level": "warn", "code": "304"}`},
{colMsg: `{"level": "error", "status": "500", "method": "POST", "duration": "123ms"}`},
},
requestedKeys: nil, // nil means extract all keys
expectedFields: 6, // 6 columns: message, code, duration, level, method, status
expectedOutput: arrowtest.Rows{
{"message": `{"level": "info", "status": "200", "method": "GET"}`, "code": nil, "duration": nil, "level": "info", "method": "GET", "status": "200"},
{"message": `{"level": "warn", "code": "304"}`, "code": "304", "duration": nil, "level": "warn", "method": nil, "status": nil},
{"message": `{"level": "error", "status": "500", "method": "POST", "duration": "123ms"}`, "code": nil, "duration": "123ms", "level": "error", "method": "POST", "status": "500"},
{
colMsg: `{"level": "info", "status": "200", "method": "GET"}`,
"utf8.parsed.code": nil,
"utf8.parsed.duration": nil,
"utf8.parsed.level": "info",
"utf8.parsed.method": "GET",
"utf8.parsed.status": "200",
},
{
colMsg: `{"level": "warn", "code": "304"}`,
"utf8.parsed.code": "304",
"utf8.parsed.duration": nil,
"utf8.parsed.level": "warn",
"utf8.parsed.method": nil,
"utf8.parsed.status": nil,
},
{
colMsg: `{"level": "error", "status": "500", "method": "POST", "duration": "123ms"}`,
"utf8.parsed.code": nil,
"utf8.parsed.duration": "123ms",
"utf8.parsed.level": "error",
"utf8.parsed.method": "POST",
"utf8.parsed.status": "500",
},
},
},
{
name: "extract all keys with errors when none requested",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"level": "info", "status": "200", "method": "GET"}`}, // Valid line
{"message": `{"level": "error", "code": 500}`}, // Also valid, adds code column
{"message": `{"msg": "unclosed}`}, // Unclosed quote
{"message": `{"level": "debug", "method": "POST"}`}, // Valid line
{colMsg: `{"level": "info", "status": "200", "method": "GET"}`}, // Valid line
{colMsg: `{"level": "error", "code": 500}`}, // Also valid, adds code column
{colMsg: `{"msg": "unclosed}`}, // Unclosed quote
{colMsg: `{"level": "debug", "method": "POST"}`}, // Valid line
},
requestedKeys: nil, // nil means extract all keys
expectedFields: 7, // 7 columns: message, level, method, status, code, __error__, __error_details__
expectedOutput: arrowtest.Rows{
{"message": `{"level": "info", "status": "200", "method": "GET"}`, "level": "info", "method": "GET", "status": "200", "code": nil, types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{"message": `{"level": "error", "code": 500}`, "level": "error", "method": nil, "status": nil, "code": "500", types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{"message": `{"msg": "unclosed}`, "level": nil, "method": nil, "status": nil, "code": nil, types.ColumnNameParsedError: "JSONParserErr", types.ColumnNameParsedErrorDetails: "Value is string, but can't find closing '\"' symbol"},
{"message": `{"level": "debug", "method": "POST"}`, "level": "debug", "method": "POST", "status": nil, "code": nil, types.ColumnNameParsedError: nil, types.ColumnNameParsedErrorDetails: nil},
{
colMsg: `{"level": "info", "status": "200", "method": "GET"}`,
"utf8.parsed.level": "info",
"utf8.parsed.method": "GET",
"utf8.parsed.status": "200",
"utf8.parsed.code": nil,
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
{
colMsg: `{"level": "error", "code": 500}`,
"utf8.parsed.level": "error",
"utf8.parsed.method": nil,
"utf8.parsed.status": nil,
"utf8.parsed.code": "500",
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
{
colMsg: `{"msg": "unclosed}`,
"utf8.parsed.level": nil,
"utf8.parsed.method": nil,
"utf8.parsed.status": nil,
"utf8.parsed.code": nil,
semconv.ColumnIdentError.FQN(): "JSONParserErr",
semconv.ColumnIdentErrorDetails.FQN(): "Value is string, but can't find closing '\"' symbol",
},
{
colMsg: `{"level": "debug", "method": "POST"}`,
"utf8.parsed.level": "debug",
"utf8.parsed.method": "POST",
"utf8.parsed.status": nil,
"utf8.parsed.code": nil,
semconv.ColumnIdentError.FQN(): nil,
semconv.ColumnIdentErrorDetails.FQN(): nil,
},
},
},
{
name: "handle nested JSON objects with underscore flattening",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"user": {"name": "john", "details": {"age": "30", "city": "NYC"}}, "status": "active"}`},
{"message": `{"app": {"version": "1.0", "config": {"debug": "true"}}, "level": "info"}`},
{"message": `{"nested": {"deep": {"very": {"deep": "value"}}}}`},
{colMsg: `{"user": {"name": "john", "details": {"age": "30", "city": "NYC"}}, "status": "active"}`},
{colMsg: `{"app": {"version": "1.0", "config": {"debug": "true"}}, "level": "info"}`},
{colMsg: `{"nested": {"deep": {"very": {"deep": "value"}}}}`},
},
requestedKeys: nil, // Extract all keys including nested ones
expectedFields: 9, // message, app_config_debug, app_version, level, nested_deep_very_deep, status, user_details_age, user_details_city, user_name
expectedOutput: arrowtest.Rows{
{"message": `{"user": {"name": "john", "details": {"age": "30", "city": "NYC"}}, "status": "active"}`, "app_config_debug": nil, "app_version": nil, "level": nil, "nested_deep_very_deep": nil, "status": "active", "user_details_age": "30", "user_details_city": "NYC", "user_name": "john"},
{"message": `{"app": {"version": "1.0", "config": {"debug": "true"}}, "level": "info"}`, "app_config_debug": "true", "app_version": "1.0", "level": "info", "nested_deep_very_deep": nil, "status": nil, "user_details_age": nil, "user_details_city": nil, "user_name": nil},
{"message": `{"nested": {"deep": {"very": {"deep": "value"}}}}`, "app_config_debug": nil, "app_version": nil, "level": nil, "nested_deep_very_deep": "value", "status": nil, "user_details_age": nil, "user_details_city": nil, "user_name": nil},
{
colMsg: `{"user": {"name": "john", "details": {"age": "30", "city": "NYC"}}, "status": "active"}`,
"utf8.parsed.app_config_debug": nil,
"utf8.parsed.app_version": nil,
"utf8.parsed.level": nil,
"utf8.parsed.nested_deep_very_deep": nil,
"utf8.parsed.status": "active",
"utf8.parsed.user_details_age": "30",
"utf8.parsed.user_details_city": "NYC",
"utf8.parsed.user_name": "john",
},
{
colMsg: `{"app": {"version": "1.0", "config": {"debug": "true"}}, "level": "info"}`,
"utf8.parsed.app_config_debug": "true",
"utf8.parsed.app_version": "1.0",
"utf8.parsed.level": "info",
"utf8.parsed.nested_deep_very_deep": nil,
"utf8.parsed.status": nil,
"utf8.parsed.user_details_age": nil,
"utf8.parsed.user_details_city": nil,
"utf8.parsed.user_name": nil,
},
{
colMsg: `{"nested": {"deep": {"very": {"deep": "value"}}}}`,
"utf8.parsed.app_config_debug": nil,
"utf8.parsed.app_version": nil,
"utf8.parsed.level": nil,
"utf8.parsed.nested_deep_very_deep": "value",
"utf8.parsed.status": nil,
"utf8.parsed.user_details_age": nil,
"utf8.parsed.user_details_city": nil,
"utf8.parsed.user_name": nil,
},
},
},
{
name: "handle nested JSON with specific requested keys",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"user": {"name": "alice", "profile": {"email": "alice@example.com"}}, "level": "debug"}`},
{"message": `{"user": {"name": "bob"}, "level": "info"}`},
{"message": `{"level": "error", "error": {"code": "500", "message": "internal"}}`},
{colMsg: `{"user": {"name": "alice", "profile": {"email": "alice@example.com"}}, "level": "debug"}`},
{colMsg: `{"user": {"name": "bob"}, "level": "info"}`},
{colMsg: `{"level": "error", "error": {"code": "500", "message": "internal"}}`},
},
requestedKeys: []string{"user_name", "user_profile_email", "level"},
expectedFields: 4, // message, level, user_name, user_profile_email
expectedOutput: arrowtest.Rows{
{"message": `{"user": {"name": "alice", "profile": {"email": "alice@example.com"}}, "level": "debug"}`, "level": "debug", "user_name": "alice", "user_profile_email": "alice@example.com"},
{"message": `{"user": {"name": "bob"}, "level": "info"}`, "level": "info", "user_name": "bob", "user_profile_email": nil},
{"message": `{"level": "error", "error": {"code": "500", "message": "internal"}}`, "level": "error", "user_name": nil, "user_profile_email": nil},
{
colMsg: `{"user": {"name": "alice", "profile": {"email": "alice@example.com"}}, "level": "debug"}`,
"utf8.parsed.level": "debug",
"utf8.parsed.user_name": "alice",
"utf8.parsed.user_profile_email": "alice@example.com",
},
{
colMsg: `{"user": {"name": "bob"}, "level": "info"}`,
"utf8.parsed.level": "info",
"utf8.parsed.user_name": "bob",
"utf8.parsed.user_profile_email": nil,
},
{
colMsg: `{"level": "error", "error": {"code": "500", "message": "internal"}}`,
"utf8.parsed.level": "error",
"utf8.parsed.user_name": nil,
"utf8.parsed.user_profile_email": nil,
},
},
},
{
name: "accept JSON numbers as strings (v1 compatibility)",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"status": 200, "port": 8080, "timeout": 30.5, "retries": 0}`},
{"message": `{"level": "info", "pid": 12345, "memory": 256.8}`},
{"message": `{"score": -1, "version": 2.1, "enabled": true}`},
{colMsg: `{"status": 200, "port": 8080, "timeout": 30.5, "retries": 0}`},
{colMsg: `{"level": "info", "pid": 12345, "memory": 256.8}`},
{colMsg: `{"score": -1, "version": 2.1, "enabled": true}`},
},
requestedKeys: []string{"status", "port", "timeout", "pid", "memory", "score", "version"},
expectedFields: 8, // message, memory, pid, port, score, status, timeout, version
expectedOutput: arrowtest.Rows{
{"message": `{"status": 200, "port": 8080, "timeout": 30.5, "retries": 0}`, "memory": nil, "pid": nil, "port": "8080", "score": nil, "status": "200", "timeout": "30.5", "version": nil},
{"message": `{"level": "info", "pid": 12345, "memory": 256.8}`, "memory": "256.8", "pid": "12345", "port": nil, "score": nil, "status": nil, "timeout": nil, "version": nil},
{"message": `{"score": -1, "version": 2.1, "enabled": true}`, "memory": nil, "pid": nil, "port": nil, "score": "-1", "status": nil, "timeout": nil, "version": "2.1"},
{
colMsg: `{"status": 200, "port": 8080, "timeout": 30.5, "retries": 0}`,
"utf8.parsed.memory": nil,
"utf8.parsed.pid": nil,
"utf8.parsed.port": "8080",
"utf8.parsed.score": nil,
"utf8.parsed.status": "200",
"utf8.parsed.timeout": "30.5",
"utf8.parsed.version": nil,
},
{
colMsg: `{"level": "info", "pid": 12345, "memory": 256.8}`,
"utf8.parsed.memory": "256.8",
"utf8.parsed.pid": "12345",
"utf8.parsed.port": nil,
"utf8.parsed.score": nil,
"utf8.parsed.status": nil,
"utf8.parsed.timeout": nil,
"utf8.parsed.version": nil,
},
{
colMsg: `{"score": -1, "version": 2.1, "enabled": true}`,
"utf8.parsed.memory": nil,
"utf8.parsed.pid": nil,
"utf8.parsed.port": nil,
"utf8.parsed.score": "-1",
"utf8.parsed.status": nil,
"utf8.parsed.timeout": nil,
"utf8.parsed.version": "2.1",
},
},
},
{
name: "mixed nested objects and numbers",
schema: arrow.NewSchema([]arrow.Field{
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
semconv.FieldFromFQN("utf8.builtin.message", true),
}, nil),
input: arrowtest.Rows{
{"message": `{"request": {"url": "/api/users", "method": "GET"}, "response": {"status": 200, "time": 45.2}}`},
{"message": `{"user": {"id": 123, "profile": {"age": 25}}, "active": true}`},
{colMsg: `{"request": {"url": "/api/users", "method": "GET"}, "response": {"status": 200, "time": 45.2}}`},
{colMsg: `{"user": {"id": 123, "profile": {"age": 25}}, "active": true}`},
},
requestedKeys: nil, // Extract all keys
expectedFields: 8, // message, active, request_method, request_url, response_status, response_time, user_id, user_profile_age
expectedOutput: arrowtest.Rows{
{"message": `{"request": {"url": "/api/users", "method": "GET"}, "response": {"status": 200, "time": 45.2}}`, "active": nil, "request_method": "GET", "request_url": "/api/users", "response_status": "200", "response_time": "45.2", "user_id": nil, "user_profile_age": nil},
{"message": `{"user": {"id": 123, "profile": {"age": 25}}, "active": true}`, "active": "true", "request_method": nil, "request_url": nil, "response_status": nil, "response_time": nil, "user_id": "123", "user_profile_age": "25"},
{
colMsg: `{"request": {"url": "/api/users", "method": "GET"}, "response": {"status": 200, "time": 45.2}}`,
"utf8.parsed.active": nil,
"utf8.parsed.request_method": "GET",
"utf8.parsed.request_url": "/api/users",
"utf8.parsed.response_status": "200",
"utf8.parsed.response_time": "45.2",
"utf8.parsed.user_id": nil,
"utf8.parsed.user_profile_age": nil,
},
{
colMsg: `{"user": {"id": 123, "profile": {"age": 25}}, "active": true}`,
"utf8.parsed.active": "true",
"utf8.parsed.request_method": nil,
"utf8.parsed.request_url": nil,
"utf8.parsed.response_status": nil,
"utf8.parsed.response_time": nil,
"utf8.parsed.user_id": "123",
"utf8.parsed.user_profile_age": "25",
},
},
},
} {

@ -81,7 +81,7 @@ func AssertPipelinesEqual(t testing.TB, left, right Pipeline) {
// Check schema compatibility
require.True(t, leftBatch.Schema().Equal(rightBatch.Schema()),
"Pipelines have incompatible schemas: %v vs %v",
"Pipelines have incompatible schemas:\nleft %v\nright %v",
leftBatch.Schema(), rightBatch.Schema())
// Compare rows until one of the batches is exhausted

@ -8,7 +8,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
)
func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, evaluator *expressionEvaluator) (*GenericPipeline, error) {
@ -40,7 +40,8 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva
if err != nil {
return failureState(err)
}
fields = append(fields, arrow.Field{Name: columnNames[i], Type: vec.Type().ArrowType(), Metadata: types.ColumnMetadata(vec.ColumnType(), vec.Type())})
ident := semconv.NewIdentifier(columnNames[i], vec.ColumnType(), vec.Type())
fields = append(fields, semconv.FieldFromIdent(ident, true))
arr := vec.ToArray()
defer arr.Release()
projected = append(projected, arr)

@ -8,14 +8,15 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
func TestNewProjectPipeline(t *testing.T) {
fields := []arrow.Field{
{Name: "name", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
{Name: "age", Type: types.Arrow.Integer, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Integer)},
{Name: "city", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
semconv.FieldFromFQN("utf8.builtin.name", false),
semconv.FieldFromFQN("int64.builtin.age", false),
semconv.FieldFromFQN("utf8.builtin.city", false),
}
t.Run("project single column", func(t *testing.T) {
@ -45,7 +46,7 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output
expectedCSV := "Alice\nBob\nCharlie"
expectedFields := []arrow.Field{
{Name: "name", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
semconv.FieldFromFQN("utf8.builtin.name", true),
}
expectedRecord, err := CSVToArrowWithAllocator(alloc, expectedFields, expectedCSV)
require.NoError(t, err)
@ -84,8 +85,8 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output
expectedCSV := "Alice,New York\nBob,Boston\nCharlie,Seattle"
expectedFields := []arrow.Field{
{Name: "name", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
{Name: "city", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
semconv.FieldFromFQN("utf8.builtin.name", true),
semconv.FieldFromFQN("utf8.builtin.city", true),
}
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
require.NoError(t, err)
@ -127,9 +128,9 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output
expectedCSV := "New York,30,Alice\nBoston,25,Bob\nSeattle,35,Charlie"
expectedFields := []arrow.Field{
{Name: "city", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
{Name: "age", Type: types.Arrow.Integer, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Integer)},
{Name: "name", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
semconv.FieldFromFQN("utf8.builtin.city", true),
semconv.FieldFromFQN("int64.builtin.age", true),
semconv.FieldFromFQN("utf8.builtin.name", true),
}
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
require.NoError(t, err)
@ -173,8 +174,8 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output also split across multiple records
expectedFields := []arrow.Field{
{Name: "name", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)},
{Name: "age", Type: types.Arrow.Integer, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Integer)},
semconv.FieldFromFQN("utf8.builtin.name", true),
semconv.FieldFromFQN("int64.builtin.age", true),
}
expected := `

@ -10,43 +10,52 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
const arrowTimestampFormat = "2006-01-02T15:04:05.000000000Z"
var (
colTs = "timestamp_ns.builtin.timestamp"
colEnv = "utf8.label.env"
colSvc = "utf8.label.service"
colLvl = "utf8.metadata.severity"
colVal = "float64.generated.value"
)
func TestRangeAggregationPipeline_instant(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
// input schema with timestamp, partition-by columns and non-partition columns
fields := []arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: types.Arrow.Timestamp, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: "env", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)},
{Name: "service", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)},
{Name: "severity", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)}, // extra column not included in partition_by
semconv.FieldFromFQN(colTs, false),
semconv.FieldFromFQN(colEnv, false),
semconv.FieldFromFQN(colSvc, false),
semconv.FieldFromFQN(colLvl, true),
}
schema := arrow.NewSchema(fields, nil)
rowsPipelineA := []arrowtest.Rows{
{
{"timestamp": time.Unix(20, 0).UTC(), "env": "prod", "service": "app1", "severity": "error"}, // included
{"timestamp": time.Unix(15, 0).UTC(), "env": "prod", "service": "app1", "severity": "info"},
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app1", "severity": "error"}, // excluded, open interval
{"timestamp": time.Unix(12, 0).UTC(), "env": "prod", "service": "app2", "severity": "error"},
{"timestamp": time.Unix(12, 0).UTC(), "env": "dev", "service": "", "severity": "error"},
{colTs: time.Unix(20, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "error"}, // included
{colTs: time.Unix(15, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "info"},
{colTs: time.Unix(10, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "error"}, // excluded, open interval
{colTs: time.Unix(12, 0).UTC(), colEnv: "prod", colSvc: "app2", colLvl: "error"},
{colTs: time.Unix(12, 0).UTC(), colEnv: "dev", colSvc: "", colLvl: "error"},
},
}
rowsPipelineB := []arrowtest.Rows{
{
{"timestamp": time.Unix(15, 0).UTC(), "env": "prod", "service": "app2", "severity": "info"},
{"timestamp": time.Unix(12, 0).UTC(), "env": "prod", "service": "app2", "severity": "error"},
{colTs: time.Unix(15, 0).UTC(), colEnv: "prod", colSvc: "app2", colLvl: "info"},
{colTs: time.Unix(12, 0).UTC(), colEnv: "prod", colSvc: "app2", colLvl: "error"},
},
{
{"timestamp": time.Unix(15, 0).UTC(), "env": "prod", "service": "app3", "severity": "info"},
{"timestamp": time.Unix(12, 0).UTC(), "env": "prod", "service": "app3", "severity": "error"},
{"timestamp": time.Unix(5, 0).UTC(), "env": "dev", "service": "app2", "severity": "error"}, // excluded, out of range
{colTs: time.Unix(15, 0).UTC(), colEnv: "prod", colSvc: "app3", colLvl: "info"},
{colTs: time.Unix(12, 0).UTC(), colEnv: "prod", colSvc: "app3", colLvl: "error"},
{colTs: time.Unix(5, 0).UTC(), colEnv: "dev", colSvc: "app2", colLvl: "error"}, // excluded, out of range
},
}
@ -83,10 +92,10 @@ func TestRangeAggregationPipeline_instant(t *testing.T) {
defer record.Release()
expect := arrowtest.Rows{
{"timestamp": time.Unix(20, 0).UTC(), "value": float64(2), "env": "prod", "service": "app1"},
{"timestamp": time.Unix(20, 0).UTC(), "value": float64(3), "env": "prod", "service": "app2"},
{"timestamp": time.Unix(20, 0).UTC(), "value": float64(2), "env": "prod", "service": "app3"},
{"timestamp": time.Unix(20, 0).UTC(), "value": float64(1), "env": "dev", "service": nil},
{colTs: time.Unix(20, 0).UTC(), colVal: float64(2), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app1"},
{colTs: time.Unix(20, 0).UTC(), colVal: float64(3), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app2"},
{colTs: time.Unix(20, 0).UTC(), colVal: float64(2), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app3"},
{colTs: time.Unix(20, 0).UTC(), colVal: float64(1), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": nil},
}
rows, err := arrowtest.RecordRows(record)
@ -105,10 +114,10 @@ func TestRangeAggregationPipeline(t *testing.T) {
var (
fields = []arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: types.Arrow.Timestamp, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: "env", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)},
{Name: "service", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)},
{Name: "severity", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeMetadata, types.Loki.String)},
semconv.FieldFromFQN(colTs, false),
semconv.FieldFromFQN(colEnv, false),
semconv.FieldFromFQN(colSvc, false),
semconv.FieldFromFQN(colLvl, true),
}
schema = arrow.NewSchema(fields, nil)
@ -117,23 +126,23 @@ func TestRangeAggregationPipeline(t *testing.T) {
rowsPipelineA = []arrowtest.Rows{
{
// time.Unix(0, 0) is not part of any window, it falls on the open interval of the first window
{"timestamp": time.Unix(0, 0).UTC(), "env": "prod", "service": "app1", "severity": "info"},
{"timestamp": time.Unix(2, 0).UTC(), "env": "prod", "service": "app1", "severity": "warn"},
{"timestamp": time.Unix(4, 0).UTC(), "env": "prod", "service": "app1", "severity": "info"},
{"timestamp": time.Unix(5, 0).UTC(), "env": "prod", "service": "app2", "severity": "error"},
{colTs: time.Unix(0, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "info"},
{colTs: time.Unix(2, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "warn"},
{colTs: time.Unix(4, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "info"},
{colTs: time.Unix(5, 0).UTC(), colEnv: "prod", colSvc: "app2", colLvl: "error"},
}, {
{"timestamp": time.Unix(6, 0).UTC(), "env": "dev", "service": "app1", "severity": "info"},
{"timestamp": time.Unix(8, 0).UTC(), "env": "prod", "service": "app1", "severity": "error"},
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app2", "severity": "info"},
{"timestamp": time.Unix(12, 0).UTC(), "env": "prod", "service": "app1", "severity": "info"},
{"timestamp": time.Unix(15, 0).UTC(), "env": "prod", "service": "app2", "severity": "error"},
{colTs: time.Unix(6, 0).UTC(), colEnv: "dev", colSvc: "app1", colLvl: "info"},
{colTs: time.Unix(8, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "error"},
{colTs: time.Unix(10, 0).UTC(), colEnv: "prod", colSvc: "app2", colLvl: "info"},
{colTs: time.Unix(12, 0).UTC(), colEnv: "prod", colSvc: "app1", colLvl: "info"},
{colTs: time.Unix(15, 0).UTC(), colEnv: "prod", colSvc: "app2", colLvl: "error"},
},
}
rowsPiplelineB = []arrowtest.Rows{{
{"timestamp": time.Unix(20, 0).UTC(), "env": "dev", "service": "app1", "severity": "info"},
{"timestamp": time.Unix(25, 0).UTC(), "env": "dev", "service": "app2", "severity": "error"},
{"timestamp": time.Unix(28, 0).UTC(), "env": "dev", "service": "app1", "severity": "info"},
{"timestamp": time.Unix(30, 0).UTC(), "env": "dev", "service": "app2", "severity": "info"},
{colTs: time.Unix(20, 0).UTC(), colEnv: "dev", colSvc: "app1", colLvl: "info"},
{colTs: time.Unix(25, 0).UTC(), colEnv: "dev", colSvc: "app2", colLvl: "error"},
{colTs: time.Unix(28, 0).UTC(), colEnv: "dev", colSvc: "app1", colLvl: "info"},
{colTs: time.Unix(30, 0).UTC(), colEnv: "dev", colSvc: "app2", colLvl: "info"},
}}
)
@ -174,18 +183,18 @@ func TestRangeAggregationPipeline(t *testing.T) {
expect := arrowtest.Rows{
// time.Unix(10, 0)
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app1", "value": float64(3)},
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app2", "value": float64(2)},
{"timestamp": time.Unix(10, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app1", colVal: float64(3)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app2", colVal: float64(2)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(20, 0)
{"timestamp": time.Unix(20, 0).UTC(), "env": "prod", "service": "app1", "value": float64(1)},
{"timestamp": time.Unix(20, 0).UTC(), "env": "prod", "service": "app2", "value": float64(1)},
{"timestamp": time.Unix(20, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(20, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app1", colVal: float64(1)},
{colTs: time.Unix(20, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app2", colVal: float64(1)},
{colTs: time.Unix(20, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(30, 0)
{"timestamp": time.Unix(30, 0).UTC(), "env": "dev", "service": "app2", "value": float64(2)},
{"timestamp": time.Unix(30, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(30, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app2", colVal: float64(2)},
{colTs: time.Unix(30, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
}
rows, err := arrowtest.RecordRows(record)
@ -195,7 +204,7 @@ func TestRangeAggregationPipeline(t *testing.T) {
// rows are expected to be sorted by timestamp.
// for a given timestamp, no ordering is enforced based on labels.
require.True(t, slices.IsSortedFunc(rows, func(a, b arrowtest.Row) int {
return a["timestamp"].(time.Time).Compare(b["timestamp"].(time.Time))
return a[colTs].(time.Time).Compare(b[colTs].(time.Time))
}))
require.ElementsMatch(t, expect, rows)
})
@ -222,31 +231,31 @@ func TestRangeAggregationPipeline(t *testing.T) {
expect := arrowtest.Rows{
// time.Unix(10, 0)
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app1", "value": float64(3)},
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app2", "value": float64(2)},
{"timestamp": time.Unix(10, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app1", colVal: float64(3)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app2", colVal: float64(2)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(15, 0)
{"timestamp": time.Unix(15, 0).UTC(), "env": "prod", "service": "app2", "value": float64(2)},
{"timestamp": time.Unix(15, 0).UTC(), "env": "prod", "service": "app1", "value": float64(2)},
{"timestamp": time.Unix(15, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(15, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app2", colVal: float64(2)},
{colTs: time.Unix(15, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app1", colVal: float64(2)},
{colTs: time.Unix(15, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(20, 0)
{"timestamp": time.Unix(20, 0).UTC(), "env": "prod", "service": "app1", "value": float64(1)},
{"timestamp": time.Unix(20, 0).UTC(), "env": "prod", "service": "app2", "value": float64(1)},
{"timestamp": time.Unix(20, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(20, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app1", colVal: float64(1)},
{colTs: time.Unix(20, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app2", colVal: float64(1)},
{colTs: time.Unix(20, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(25, 0)
{"timestamp": time.Unix(25, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{"timestamp": time.Unix(25, 0).UTC(), "env": "dev", "service": "app2", "value": float64(1)},
{colTs: time.Unix(25, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
{colTs: time.Unix(25, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app2", colVal: float64(1)},
// time.Unix(30, 0)
{"timestamp": time.Unix(30, 0).UTC(), "env": "dev", "service": "app2", "value": float64(2)},
{"timestamp": time.Unix(30, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(30, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app2", colVal: float64(2)},
{colTs: time.Unix(30, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(35, 0)
{"timestamp": time.Unix(35, 0).UTC(), "env": "dev", "service": "app2", "value": float64(1)},
{"timestamp": time.Unix(35, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(35, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app2", colVal: float64(1)},
{colTs: time.Unix(35, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
}
rows, err := arrowtest.RecordRows(record)
@ -256,7 +265,8 @@ func TestRangeAggregationPipeline(t *testing.T) {
// rows are expected to be sorted by timestamp.
// for a given timestamp, no ordering is enforced based on labels.
require.True(t, slices.IsSortedFunc(rows, func(a, b arrowtest.Row) int {
return a["timestamp"].(time.Time).Compare(b["timestamp"].(time.Time))
t.Log(a, b)
return a[colTs].(time.Time).Compare(b[colTs].(time.Time))
}))
require.ElementsMatch(t, expect, rows)
})
@ -283,16 +293,16 @@ func TestRangeAggregationPipeline(t *testing.T) {
expect := arrowtest.Rows{
// time.Unix(10, 0)
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app1", "value": float64(1)},
{"timestamp": time.Unix(10, 0).UTC(), "env": "prod", "service": "app2", "value": float64(1)},
{"timestamp": time.Unix(10, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app1", colVal: float64(1)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "prod", "utf8.ambiguous.service": "app2", colVal: float64(1)},
{colTs: time.Unix(10, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(20, 0)
{"timestamp": time.Unix(20, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(20, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
// time.Unix(30, 0)
{"timestamp": time.Unix(30, 0).UTC(), "env": "dev", "service": "app2", "value": float64(1)},
{"timestamp": time.Unix(30, 0).UTC(), "env": "dev", "service": "app1", "value": float64(1)},
{colTs: time.Unix(30, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app2", colVal: float64(1)},
{colTs: time.Unix(30, 0).UTC(), "utf8.ambiguous.env": "dev", "utf8.ambiguous.service": "app1", colVal: float64(1)},
}
rows, err := arrowtest.RecordRows(record)
@ -302,7 +312,7 @@ func TestRangeAggregationPipeline(t *testing.T) {
// rows are expected to be sorted by timestamp.
// for a given timestamp, no ordering is enforced based on labels.
require.True(t, slices.IsSortedFunc(rows, func(a, b arrowtest.Row) int {
return a["timestamp"].(time.Time).Compare(b["timestamp"].(time.Time))
return a[colTs].(time.Time).Compare(b[colTs].(time.Time))
}))
require.ElementsMatch(t, expect, rows)
})

@ -58,9 +58,9 @@ func validateSchemaCompatibility(a, b *arrow.Schema) error {
aField, bField := a.Field(i), b.Field(i)
if !arrow.TypeEqual(aField.Type, bField.Type) {
return fmt.Errorf("field %d has different types: %s vs %s", i, aField.Type, bField.Type)
return fmt.Errorf("field %d has different types: %s (%s) vs %s (%s)", i, aField.Type, aField.Name, bField.Type, bField.Name)
} else if aField.Nullable != bField.Nullable {
return fmt.Errorf("field %d has different nullability: %t vs %t", i, aField.Nullable, bField.Nullable)
return fmt.Errorf("field %d has different nullability: %t (%s) vs %t (%s)", i, aField.Nullable, aField.Name, bField.Nullable, bField.Name)
}
}

@ -10,6 +10,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -39,14 +40,14 @@ func newStreamInjector(alloc memory.Allocator, view *streamsView) *streamInjecto
//
// The returned record must be Release()d by the caller when no longer needed.
func (si *streamInjector) Inject(ctx context.Context, in arrow.Record) (arrow.Record, error) {
streamIDIndex, err := si.findStreamIDColumn(in)
streamIDCol, streamIDIndex, err := columnForFQN(streamInjectorColumnName, in)
if err != nil {
return nil, fmt.Errorf("finding stream ID column: %w", err)
return nil, err
}
streamIDValues, ok := in.Column(streamIDIndex).(*array.Int64)
streamIDValues, ok := streamIDCol.(*array.Int64)
if !ok {
return nil, fmt.Errorf("stream ID column %q must be of type int64, got %s", streamInjectorColumnName, in.Schema().Field(streamIDIndex).Type)
return nil, fmt.Errorf("column %s must be of type int64, got %s", streamInjectorColumnName, in.Schema().Field(streamIDIndex))
}
type labelColumn struct {
@ -56,7 +57,7 @@ func (si *streamInjector) Inject(ctx context.Context, in arrow.Record) (arrow.Re
var (
labels = make([]*labelColumn, 0, si.view.NumLabels())
labelLookup = make(map[string]*labelColumn, si.view.NumLabels())
labelLookup = make(map[semconv.Identifier]*labelColumn, si.view.NumLabels())
)
defer func() {
for _, col := range labels {
@ -65,22 +66,19 @@ func (si *streamInjector) Inject(ctx context.Context, in arrow.Record) (arrow.Re
}()
getColumn := func(name string) *labelColumn {
if col, ok := labelLookup[name]; ok {
ident := semconv.NewIdentifier(name, types.ColumnTypeLabel, types.Loki.String)
if col, ok := labelLookup[*ident]; ok {
return col
}
col := &labelColumn{
Field: arrow.Field{
Name: name,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String),
},
Field: semconv.FieldFromIdent(ident, true), // labels are nullable
Builder: array.NewStringBuilder(si.alloc),
}
labels = append(labels, col)
labelLookup[name] = col
labelLookup[*ident] = col
return col
}
@ -148,13 +146,3 @@ func (si *streamInjector) Inject(ctx context.Context, in arrow.Record) (arrow.Re
schema := arrow.NewSchemaWithEndian(fields, &md, in.Schema().Endianness())
return array.NewRecord(schema, arrs, in.NumRows()), nil
}
func (si *streamInjector) findStreamIDColumn(in arrow.Record) (int, error) {
indices := in.Schema().FieldIndices(streamInjectorColumnName)
if len(indices) == 0 {
return -1, fmt.Errorf("stream ID column %q not found in input record", streamInjectorColumnName)
} else if len(indices) > 1 {
return -1, fmt.Errorf("multiple stream ID columns found in input record: %v", indices)
}
return indices[0], nil
}

@ -23,10 +23,10 @@ func Test_streamInjector(t *testing.T) {
sec := buildStreamsSection(t, inputStreams)
input := arrowtest.Rows{
{"stream_id.int64": 2, "ts": int64(1), "line": "log line 1"},
{"stream_id.int64": 1, "ts": int64(2), "line": "log line 2"},
{"stream_id.int64": 3, "ts": int64(3), "line": "log line 3"},
{"stream_id.int64": 2, "ts": int64(4), "line": "log line 4"},
{streamInjectorColumnName: 2, "ts": int64(1), "line": "log line 1"},
{streamInjectorColumnName: 1, "ts": int64(2), "line": "log line 2"},
{streamInjectorColumnName: 3, "ts": int64(3), "line": "log line 3"},
{streamInjectorColumnName: 2, "ts": int64(4), "line": "log line 4"},
}
record := input.Record(alloc, input.Schema())
@ -43,10 +43,10 @@ func Test_streamInjector(t *testing.T) {
require.NoError(t, err)
expect := arrowtest.Rows{
{"app": "loki", "env": "dev", "region": nil, "ts": int64(1), "line": "log line 1"},
{"app": "loki", "env": "prod", "region": "us-west", "ts": int64(2), "line": "log line 2"},
{"app": "loki", "env": "prod", "region": "us-east", "ts": int64(3), "line": "log line 3"},
{"app": "loki", "env": "dev", "region": nil, "ts": int64(4), "line": "log line 4"},
{"utf8.label.app": "loki", "utf8.label.env": "dev", "utf8.label.region": nil, "ts": int64(1), "line": "log line 1"},
{"utf8.label.app": "loki", "utf8.label.env": "prod", "utf8.label.region": "us-west", "ts": int64(2), "line": "log line 2"},
{"utf8.label.app": "loki", "utf8.label.env": "prod", "utf8.label.region": "us-east", "ts": int64(3), "line": "log line 3"},
{"utf8.label.app": "loki", "utf8.label.env": "dev", "utf8.label.region": nil, "ts": int64(4), "line": "log line 4"},
}
actual, err := arrowtest.RecordRows(output)

@ -9,6 +9,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -70,39 +71,38 @@ func exprsToFields(exprs []physical.ColumnExpression) ([]arrow.Field, error) {
panic("topkPipeline only supports ColumnExpr expressions")
}
if expr.Ref.Type == types.ColumnTypeAmbiguous {
// TODO(rfratto): It's not clear how topk should sort when there's an
// ambiguous column reference, since ambiguous column references can
// refer to multiple columns.
return nil, fmt.Errorf("topkPipeline does not support ambiguous column types")
dt, err := guessLokiType(expr.Ref)
if err != nil {
return nil, err
}
dt, md := arrowTypeFromColumnRef(expr.Ref)
fields = append(fields, arrow.Field{
Name: expr.Ref.Column,
Type: dt,
Nullable: true,
Metadata: md,
})
ident := semconv.NewIdentifier(expr.Ref.Column, expr.Ref.Type, dt)
fields = append(fields, semconv.FieldFromIdent(ident, true))
}
return fields, nil
}
func arrowTypeFromColumnRef(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
if ref.Type == types.ColumnTypeBuiltin {
func guessLokiType(ref types.ColumnRef) (types.DataType, error) {
switch ref.Type {
case types.ColumnTypeBuiltin:
switch ref.Column {
case types.ColumnNameBuiltinTimestamp:
return arrow.FixedWidthTypes.Timestamp_ns, types.ColumnMetadataBuiltinTimestamp
return types.Loki.Timestamp, nil
case types.ColumnNameBuiltinMessage:
return arrow.BinaryTypes.String, types.ColumnMetadataBuiltinMessage
return types.Loki.String, nil
default:
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
}
case types.ColumnTypeGenerated:
return types.Loki.Float, nil
case types.ColumnTypeAmbiguous:
// TODO(rfratto): It's not clear how topk should sort when there's an
// ambiguous column reference, since ambiguous column references can
// refer to multiple columns.
return nil, fmt.Errorf("topkPipeline does not support ambiguous column types")
default:
return types.Loki.String, nil
}
return types.Arrow.String, types.ColumnMetadata(ref.Type, types.Loki.String)
}
// Read computes the topk as the next record. Read blocks until all input

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
@ -17,49 +18,35 @@ func Test_topk(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
colTs := semconv.ColumnIdentTimestamp.FQN()
colMsg := semconv.ColumnIdentMessage.FQN()
var (
fields = []arrow.Field{
{
Name: types.ColumnNameBuiltinTimestamp,
Type: arrow.FixedWidthTypes.Timestamp_ns,
Nullable: true,
Metadata: types.ColumnMetadata(
types.ColumnTypeBuiltin,
types.Loki.Timestamp,
),
},
{
Name: types.ColumnNameBuiltinMessage,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: types.ColumnMetadata(
types.ColumnTypeBuiltin,
types.Loki.String,
),
},
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, true),
semconv.FieldFromIdent(semconv.ColumnIdentMessage, true),
}
schema = arrow.NewSchema(fields, nil)
)
var (
pipelineA = NewArrowtestPipeline(alloc, schema, arrowtest.Rows{
{"timestamp": time.Unix(1, 0).UTC(), "message": "line A"},
{"timestamp": time.Unix(6, 0).UTC(), "message": "line F"},
{colTs: time.Unix(1, 0).UTC(), colMsg: "line A"},
{colTs: time.Unix(6, 0).UTC(), colMsg: "line F"},
}, arrowtest.Rows{
{"timestamp": time.Unix(2, 0).UTC(), "message": "line B"},
{"timestamp": time.Unix(7, 0).UTC(), "message": "line G"},
{colTs: time.Unix(2, 0).UTC(), colMsg: "line B"},
{colTs: time.Unix(7, 0).UTC(), colMsg: "line G"},
}, arrowtest.Rows{
{"timestamp": time.Unix(3, 0).UTC(), "message": "line C"},
{"timestamp": time.Unix(8, 0).UTC(), "message": "line H"},
{colTs: time.Unix(3, 0).UTC(), colMsg: "line C"},
{colTs: time.Unix(8, 0).UTC(), colMsg: "line H"},
})
pipelineB = NewArrowtestPipeline(alloc, schema, arrowtest.Rows{
{"timestamp": time.Unix(4, 0).UTC(), "message": "line D"},
{"timestamp": time.Unix(9, 0).UTC(), "message": "line I"},
{colTs: time.Unix(4, 0).UTC(), colMsg: "line D"},
{colTs: time.Unix(9, 0).UTC(), colMsg: "line I"},
}, arrowtest.Rows{
{"timestamp": time.Unix(5, 0).UTC(), "message": "line E"},
{"timestamp": time.Unix(10, 0).UTC(), "message": "line J"},
{colTs: time.Unix(5, 0).UTC(), colMsg: "line E"},
{colTs: time.Unix(10, 0).UTC(), colMsg: "line J"},
})
)
@ -83,9 +70,9 @@ func Test_topk(t *testing.T) {
defer rec.Release()
expect := arrowtest.Rows{
{"timestamp": time.Unix(1, 0).UTC(), "message": "line A"},
{"timestamp": time.Unix(2, 0).UTC(), "message": "line B"},
{"timestamp": time.Unix(3, 0).UTC(), "message": "line C"},
{colTs: time.Unix(1, 0).UTC(), colMsg: "line A"},
{colTs: time.Unix(2, 0).UTC(), colMsg: "line B"},
{colTs: time.Unix(3, 0).UTC(), colMsg: "line C"},
}
rows, err := arrowtest.RecordRows(rec)

@ -0,0 +1,29 @@
package executor
import (
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
)
// columnForIdent returns the column ([arrow.Array]) and its column index in the schema of the given input batch ([arrow.Record]).
// It returns an optional error in case the column with the fully qualified name of the identifier could not be found,
// or there are where multiple columns with the same name in the schema.
// In case of an error, the returned column index is -1.
func columnForIdent(ident *semconv.Identifier, batch arrow.Record) (arrow.Array, int, error) {
return columnForFQN(ident.FQN(), batch)
}
func columnForFQN(fqn string, batch arrow.Record) (arrow.Array, int, error) {
indices := batch.Schema().FieldIndices(fqn)
if len(indices) == 0 {
return nil, -1, fmt.Errorf("column not found for %s", fqn)
}
if len(indices) > 1 {
return nil, -1, fmt.Errorf("multiple columns found for %s", fqn)
}
return batch.Column(indices[0]), indices[0], nil
}

@ -11,14 +11,14 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
var (
incrementingIntPipeline = newRecordGenerator(
arrow.NewSchema([]arrow.Field{
{Name: "id", Type: types.Arrow.Integer, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Integer)},
semconv.FieldFromFQN("int64.builtin.id", false),
}, nil),
func(offset, maxRows, batchSize int64, schema *arrow.Schema) arrow.Record {
@ -55,8 +55,8 @@ const (
func timestampPipeline(start time.Time, order time.Duration) *recordGenerator {
return newRecordGenerator(
arrow.NewSchema([]arrow.Field{
{Name: "id", Type: types.Arrow.Integer, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Integer)},
{Name: "timestamp", Type: types.Arrow.Timestamp, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.Timestamp)},
semconv.FieldFromFQN("int64.builtin.id", false),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false),
}, nil),
func(offset, maxRows, batchSize int64, schema *arrow.Schema) arrow.Record {

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -21,10 +22,10 @@ func TestVectorAggregationPipeline(t *testing.T) {
// input schema with timestamp, value and group by columns
fields := []arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: types.Arrow.Timestamp, Metadata: types.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameGeneratedValue, Type: types.Arrow.Float, Metadata: types.ColumnMetadata(types.ColumnTypeGenerated, types.Loki.Float)},
{Name: "env", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)},
{Name: "service", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeLabel, types.Loki.String)},
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
semconv.FieldFromIdent(semconv.ColumnIdentValue, false),
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("utf8.label.service", true),
}
now := time.Now().UTC()

@ -6,7 +6,7 @@ import (
func FieldFromIdent(ident *Identifier, nullable bool) arrow.Field {
return arrow.Field{
Name: ident.columnName,
Name: ident.FQN(),
Type: ident.dataType.ArrowType(),
Nullable: nullable,
}

@ -15,12 +15,13 @@ const (
var (
ColumnIdentMessage = NewIdentifier("message", types.ColumnTypeBuiltin, types.Loki.String)
ColumnIdentTimestamp = NewIdentifier("timestamp", types.ColumnTypeBuiltin, types.Loki.Timestamp)
ColumnIdentValue = NewIdentifier("value", types.ColumnTypeGenerated, types.Loki.Integer)
ColumnIdentValue = NewIdentifier("value", types.ColumnTypeGenerated, types.Loki.Float)
ColumnIdentError = NewIdentifier("__error__", types.ColumnTypeGenerated, types.Loki.String)
ColumnIdentErrorDetails = NewIdentifier("__error_details__", types.ColumnTypeGenerated, types.Loki.String)
)
// NewIdentifier creates a new column identifier from given name, column type, and data type.
// The semantic type of an identifier is derived from its column type.
func NewIdentifier(name string, ct types.ColumnType, dt types.DataType) *Identifier {
return &Identifier{
columnName: name,
@ -38,31 +39,54 @@ type Identifier struct {
dataType types.DataType
}
// DataType returns the Loki data type of the identifier.
func (i *Identifier) DataType() types.DataType {
return i.dataType
}
// ColumnType returns the column type of the identifier.
func (i *Identifier) ColumnType() types.ColumnType {
return i.columnType
}
func (i *Identifier) Name() string {
return i.columnName
// SemType returns the semantic type of the identifier.
func (i *Identifier) SemType() SemanticType {
return SemTypeForColumnType(i.columnType)
}
// SemName returns the semantic name of the column identifier, defined as
// [ORIGIN].[NAME] in case of attributes
// [ORIGIN]:[NAME] in case of builtins
func (i *Identifier) SemName() string {
semTy := SemTypeForColumnType(i.columnType)
switch semTy.Type {
case Attribute:
return fmt.Sprintf("%s.%s", semTy.Origin, i.columnName)
case Builtin:
return fmt.Sprintf("%s:%s", semTy.Origin, i.columnName)
default:
return invalid
}
}
func (i *Identifier) Scope() Scope {
return ScopeForColumnType(i.columnType)
// ShortName returns the non-unique name part of the column identifier.
func (i *Identifier) ShortName() string {
return i.columnName
}
// String returns the string representation of the column.
// This must not be used as name for [arrow.Field].
func (i *Identifier) String() string {
return fmt.Sprintf("%s.%s.%s", i.dataType, i.columnType, i.columnName)
return fmt.Sprintf("%s[%s]", i.SemName(), i.columnType)
}
// FQN returns the fully qualified name of the identifier.
// This must be
func (i *Identifier) FQN() string {
return i.String()
return fmt.Sprintf("%s.%s.%s", i.dataType, i.columnType, i.columnName)
}
// Equal checks equality of of the identifier against a second identifier.
func (i *Identifier) Equal(other *Identifier) bool {
if i == nil || other == nil {
return false
@ -123,16 +147,19 @@ func MustParseFQN(fqn string) *Identifier {
return ident
}
type scopeOrigin string
type scopeType string
// do not export this type, because we don't want to allow creating new instances outside of this package
type semOrigin string
// do not export this type, because we don't want to allow creating new instances outside of this package
type semType string
// Scope describes the origin and type of an identifier.
type Scope struct {
Origin scopeOrigin
Type scopeType
// SemanticType decribes the origin and type of an identifier.
type SemanticType struct {
Origin semOrigin
Type semType
}
func (s Scope) String() string {
func (s SemanticType) String() string {
if s.Origin == InvalidOrigin || s.Type == InvalidType {
return invalid
}
@ -140,34 +167,34 @@ func (s Scope) String() string {
}
const (
Resource = scopeOrigin("resource")
Record = scopeOrigin("record")
Generated = scopeOrigin("generated")
Unscoped = scopeOrigin("unscoped")
Resource = semOrigin("resource")
Record = semOrigin("record")
Generated = semOrigin("generated")
Unscoped = semOrigin("unscoped")
Attribute = scopeType("attr")
Builtin = scopeType("builtin")
Attribute = semType("attr")
Builtin = semType("builtin")
InvalidOrigin = scopeOrigin("")
InvalidType = scopeType("")
InvalidOrigin = semOrigin("")
InvalidType = semType("")
)
// ScopeForColumnType converts a given [types.ColumnType] into a [Scope].
func ScopeForColumnType(value types.ColumnType) Scope {
// SemTypeForColumnType converts a given [types.ColumnType] into a [SemanticType].
func SemTypeForColumnType(value types.ColumnType) SemanticType {
switch value {
case types.ColumnTypeLabel:
return Scope{Resource, Attribute}
return SemanticType{Resource, Attribute}
case types.ColumnTypeBuiltin:
return Scope{Record, Builtin}
return SemanticType{Record, Builtin}
case types.ColumnTypeMetadata:
return Scope{Record, Attribute}
return SemanticType{Record, Attribute}
case types.ColumnTypeParsed:
return Scope{Generated, Attribute}
return SemanticType{Generated, Attribute}
case types.ColumnTypeGenerated:
return Scope{Generated, Builtin}
return SemanticType{Generated, Builtin}
case types.ColumnTypeAmbiguous:
return Scope{Unscoped, Attribute}
return SemanticType{Unscoped, Attribute}
default:
return Scope{}
return SemanticType{}
}
}

@ -65,13 +65,13 @@ func TestParsingInvalidColumnNames(t *testing.T) {
func TestScope(t *testing.T) {
tc := []struct {
name string
expected Scope
expected SemanticType
}{
{"utf8.builtin.message", Scope{Record, Builtin}},
{"utf8.label.service_name", Scope{Resource, Attribute}},
{"utf8.metadata.service_name", Scope{Record, Attribute}},
{"utf8.parsed.level", Scope{Generated, Attribute}},
{"utf8.generated.value", Scope{Generated, Builtin}},
{"utf8.builtin.message", SemanticType{Record, Builtin}},
{"utf8.label.service_name", SemanticType{Resource, Attribute}},
{"utf8.metadata.service_name", SemanticType{Record, Attribute}},
{"utf8.parsed.level", SemanticType{Generated, Attribute}},
{"utf8.generated.value", SemanticType{Generated, Builtin}},
}
for _, tt := range tc {
@ -79,7 +79,7 @@ func TestScope(t *testing.T) {
got, err := ParseFQN(tt.name)
require.NoError(t, err)
t.Log(got)
require.Equal(t, tt.expected, got.Scope())
require.Equal(t, tt.expected, got.SemType())
})
}
}

@ -1,17 +1 @@
package types //nolint:revive
import (
"github.com/apache/arrow-go/v18/arrow"
)
var (
ColumnMetadataBuiltinMessage = ColumnMetadata(ColumnTypeBuiltin, Loki.String)
ColumnMetadataBuiltinTimestamp = ColumnMetadata(ColumnTypeBuiltin, Loki.Timestamp)
)
func ColumnMetadata(ct ColumnType, dt DataType) arrow.Metadata {
return arrow.NewMetadata(
[]string{MetadataKeyColumnType, MetadataKeyColumnDataType},
[]string{ct.String(), dt.String()},
)
}

Loading…
Cancel
Save