package executor import ( "errors" "testing" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/util/arrowtest" ) func TestNewColumnCompatibilityPipeline(t *testing.T) { tests := []struct { name string compat *physical.ColumnCompat schema *arrow.Schema inputRows []arrowtest.Rows expectedSchema *arrow.Schema expectedRows []arrowtest.Rows expectError bool errorContains string }{ { name: "no column collisions - returns early", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false), semconv.FieldFromFQN("utf8.label.service", true), semconv.FieldFromFQN("utf8.metadata.detected_level", true), semconv.FieldFromFQN("utf8.metadata.org_id", true), }, nil), inputRows: []arrowtest.Rows{ { {"utf8.builtin.message": "test message", "timestamp_ns.builtin.timestamp": time.Unix(1000, 0).UTC(), "utf8.label.service": "api", "utf8.metadata.detected_level": "info", "utf8.metadata.org_id": "1"}, }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false), semconv.FieldFromFQN("utf8.label.service", true), semconv.FieldFromFQN("utf8.metadata.detected_level", true), semconv.FieldFromFQN("utf8.metadata.org_id", true), }, nil), expectedRows: []arrowtest.Rows{ { {"utf8.builtin.message": "test message", "timestamp_ns.builtin.timestamp": time.Unix(1000, 0).UTC(), "utf8.label.service": "api", "utf8.metadata.detected_level": "info", "utf8.metadata.org_id": "1"}, }, }, }, { name: "single column collision - string type", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("utf8.label.env", true), semconv.FieldFromFQN("utf8.label.status", true), // collision column semconv.FieldFromFQN("utf8.metadata.status", true), // source column with same name }, nil), inputRows: []arrowtest.Rows{ { {"utf8.builtin.message": "line 1", "utf8.label.env": "dev", "utf8.label.status": "success", "utf8.metadata.status": "200"}, {"utf8.builtin.message": "line 2", "utf8.label.env": "dev", "utf8.label.status": "failure", "utf8.metadata.status": "500"}, }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("utf8.label.env", true), semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.status_extracted", true), // new extracted column }, nil), expectedRows: []arrowtest.Rows{ { {"utf8.builtin.message": "line 1", "utf8.label.env": "dev", "utf8.label.status": "success", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200"}, {"utf8.builtin.message": "line 2", "utf8.label.env": "dev", "utf8.label.status": "failure", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "500"}, }, }, }, { name: "multiple column collisions", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("utf8.label.status", true), // collision column 1 semconv.FieldFromFQN("utf8.label.level", true), // collision column 2 semconv.FieldFromFQN("utf8.metadata.status", true), // source column 1 semconv.FieldFromFQN("utf8.metadata.level", true), // source column 2 semconv.FieldFromFQN("utf8.metadata.unique", true), // non-colliding source column }, nil), inputRows: []arrowtest.Rows{ { { "utf8.builtin.message": "test message", "utf8.label.status": "active", "utf8.label.level": "prod", "utf8.metadata.status": "200", "utf8.metadata.level": "info", "utf8.metadata.unique": "value", }, }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.label.level", true), semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.level", true), semconv.FieldFromFQN("utf8.metadata.unique", true), semconv.FieldFromFQN("utf8.metadata.level_extracted", true), // sorted by name: level comes before status semconv.FieldFromFQN("utf8.metadata.status_extracted", true), }, nil), expectedRows: []arrowtest.Rows{ { { "utf8.builtin.message": "test message", "utf8.label.status": "active", "utf8.label.level": "prod", "utf8.metadata.status": nil, // nullified due to collision "utf8.metadata.level": nil, // nullified due to collision "utf8.metadata.unique": "value", // preserved as no collision "utf8.metadata.level_extracted": "info", "utf8.metadata.status_extracted": "200", }, }, }, }, { name: "collision with null values in collision column", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), }, nil), inputRows: []arrowtest.Rows{ { {"utf8.label.status": nil, "utf8.metadata.status": "200"}, // null collision column {"utf8.label.status": "active", "utf8.metadata.status": nil}, // null source column {"utf8.label.status": nil, "utf8.metadata.status": nil}, // both null {"utf8.label.status": "active", "utf8.metadata.status": "200"}, // both non-null }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.status_extracted", true), }, nil), expectedRows: []arrowtest.Rows{ { {"utf8.label.status": nil, "utf8.metadata.status": "200", "utf8.metadata.status_extracted": nil}, // null collision -> null extraction {"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, // null source -> null extraction {"utf8.label.status": nil, "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, // both null -> null extraction {"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200"}, // both non-null -> extract }, }, }, { name: "collision with null values in source column", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), }, nil), inputRows: []arrowtest.Rows{ { {"utf8.label.status": "active", "utf8.metadata.status": nil}, // null source column }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.status_extracted", true), }, nil), expectedRows: []arrowtest.Rows{ { {"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, }, }, }, { name: "multiple batches with collisions", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), }, nil), inputRows: []arrowtest.Rows{ { {"utf8.label.status": "active", "utf8.metadata.status": "200"}, {"utf8.label.status": "inactive", "utf8.metadata.status": "404"}, }, { {"utf8.label.status": "pending", "utf8.metadata.status": "202"}, }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.status_extracted", true), }, nil), expectedRows: []arrowtest.Rows{ { {"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200"}, {"utf8.label.status": "inactive", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "404"}, }, { {"utf8.label.status": "pending", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "202"}, }, }, }, { name: "empty batch does not add _extracted column", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), }, nil), inputRows: []arrowtest.Rows{ {}, // empty batch }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), }, nil), expectedRows: []arrowtest.Rows{ {}, // empty result }, }, { name: "non-string column types - should copy through unchanged", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false), semconv.FieldFromFQN("float64.builtin.value", false), semconv.FieldFromFQN("int64.builtin.count", false), semconv.FieldFromFQN("utf8.label.status", true), // collision column semconv.FieldFromFQN("utf8.metadata.status", true), // source column }, nil), inputRows: []arrowtest.Rows{ { { "utf8.builtin.message": "test message", "timestamp_ns.builtin.timestamp": time.Unix(1000000, 0).UTC(), "float64.builtin.value": 3.14, "int64.builtin.count": int64(42), "utf8.label.status": "active", "utf8.metadata.status": "200", }, }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false), semconv.FieldFromFQN("float64.builtin.value", false), semconv.FieldFromFQN("int64.builtin.count", false), semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.status_extracted", true), }, nil), expectedRows: []arrowtest.Rows{ { { "utf8.builtin.message": "test message", "timestamp_ns.builtin.timestamp": time.Unix(1000000, 0).UTC(), "float64.builtin.value": 3.14, "int64.builtin.count": int64(42), "utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200", }, }, }, }, { name: "multiple collision types - label and metadata", compat: &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel, types.ColumnTypeMetadata}, Source: types.ColumnTypeParsed, Destination: types.ColumnTypeParsed, }, schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), // Collision columns from Label type semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.label.level", true), // Collision columns from Metadata type semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.env", true), // Source columns (Parsed) that collide with both types semconv.FieldFromFQN("utf8.parsed.status", true), // collides with both label.status and metadata.status semconv.FieldFromFQN("utf8.parsed.level", true), // collides with label.level semconv.FieldFromFQN("utf8.parsed.env", true), // collides with metadata.env semconv.FieldFromFQN("utf8.parsed.unique", true), // no collision }, nil), inputRows: []arrowtest.Rows{ { { "utf8.builtin.message": "test message", "utf8.label.status": "active", "utf8.label.level": "debug", "utf8.metadata.status": "200", "utf8.metadata.env": "production", "utf8.parsed.status": "ok", "utf8.parsed.level": "info", "utf8.parsed.env": "staging", "utf8.parsed.unique": "value", }, { "utf8.builtin.message": "test message 2", "utf8.label.status": "inactive", "utf8.label.level": "debug", "utf8.metadata.status": "404", "utf8.metadata.env": "development", "utf8.parsed.status": "error", "utf8.parsed.level": "debug", "utf8.parsed.env": "local", "utf8.parsed.unique": "another", }, // no duplicates as collision columns are null { "utf8.builtin.message": "test message 3", "utf8.label.status": nil, "utf8.label.level": nil, "utf8.metadata.status": nil, "utf8.metadata.env": nil, "utf8.parsed.status": "error", "utf8.parsed.level": "debug", "utf8.parsed.env": "local", "utf8.parsed.unique": "another", }, { "utf8.builtin.message": "test message 4", "utf8.label.status": "inactive", "utf8.label.level": "debug", "utf8.metadata.status": nil, "utf8.metadata.env": nil, "utf8.parsed.status": "error", "utf8.parsed.level": "info", "utf8.parsed.env": "local", "utf8.parsed.unique": "another", }, }, }, expectedSchema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.builtin.message", true), semconv.FieldFromFQN("utf8.label.status", true), semconv.FieldFromFQN("utf8.label.level", true), semconv.FieldFromFQN("utf8.metadata.status", true), semconv.FieldFromFQN("utf8.metadata.env", true), semconv.FieldFromFQN("utf8.parsed.status", true), semconv.FieldFromFQN("utf8.parsed.level", true), semconv.FieldFromFQN("utf8.parsed.env", true), semconv.FieldFromFQN("utf8.parsed.unique", true), // Extracted columns (sorted by name) semconv.FieldFromFQN("utf8.parsed.env_extracted", true), semconv.FieldFromFQN("utf8.parsed.level_extracted", true), semconv.FieldFromFQN("utf8.parsed.status_extracted", true), }, nil), expectedRows: []arrowtest.Rows{ { { "utf8.builtin.message": "test message", "utf8.label.status": "active", "utf8.label.level": "debug", "utf8.metadata.status": "200", "utf8.metadata.env": "production", "utf8.parsed.status": nil, "utf8.parsed.level": nil, "utf8.parsed.env": nil, "utf8.parsed.unique": "value", "utf8.parsed.env_extracted": "staging", "utf8.parsed.level_extracted": "info", "utf8.parsed.status_extracted": "ok", }, { "utf8.builtin.message": "test message 2", "utf8.label.status": "inactive", "utf8.label.level": "debug", "utf8.metadata.status": "404", "utf8.metadata.env": "development", "utf8.parsed.status": nil, "utf8.parsed.level": nil, "utf8.parsed.env": nil, "utf8.parsed.unique": "another", "utf8.parsed.env_extracted": "local", "utf8.parsed.level_extracted": "debug", "utf8.parsed.status_extracted": "error", }, { "utf8.builtin.message": "test message 3", "utf8.label.status": nil, "utf8.label.level": nil, "utf8.metadata.status": nil, "utf8.metadata.env": nil, "utf8.parsed.status": "error", "utf8.parsed.level": "debug", "utf8.parsed.env": "local", "utf8.parsed.unique": "another", "utf8.parsed.env_extracted": nil, "utf8.parsed.level_extracted": nil, "utf8.parsed.status_extracted": nil, }, { "utf8.builtin.message": "test message 4", "utf8.label.status": "inactive", "utf8.label.level": "debug", "utf8.metadata.status": nil, "utf8.metadata.env": nil, "utf8.parsed.status": nil, "utf8.parsed.level": nil, "utf8.parsed.env": "local", "utf8.parsed.unique": "another", "utf8.parsed.env_extracted": nil, "utf8.parsed.level_extracted": "info", "utf8.parsed.status_extracted": "error", }, }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Create input pipeline var input Pipeline if len(tt.inputRows) == 0 { input = emptyPipeline() } else if len(tt.inputRows) == 1 { input = NewArrowtestPipeline(tt.schema, tt.inputRows[0]) } else { // Multiple batches var records []arrow.RecordBatch for _, rows := range tt.inputRows { record := rows.Record(memory.DefaultAllocator, tt.schema) records = append(records, record) } input = NewBufferedPipeline(records...) } // Create compatibility pipeline pipeline := newColumnCompatibilityPipeline(tt.compat, input, nil) defer pipeline.Close() if tt.expectError { // Test error case _, err := pipeline.Read(t.Context()) require.Error(t, err) if tt.errorContains != "" { require.ErrorContains(t, err, tt.errorContains) } return } // Test successful cases var actualRows []arrowtest.Rows batchCount := 0 for { record, err := pipeline.Read(t.Context()) if err == EOF { break } require.NoError(t, err) // Verify schema matches expected (only check first batch) if batchCount == 0 && tt.expectedSchema != nil { require.True(t, tt.expectedSchema.Equal(record.Schema()), "Schema mismatch.\nExpected: %s\nActual: %s", tt.expectedSchema, record.Schema()) } // Convert record to rows for comparison rows, err := arrowtest.RecordRows(record) require.NoError(t, err) actualRows = append(actualRows, rows) batchCount++ } // Verify expected number of batches require.Len(t, actualRows, len(tt.expectedRows), "unexpected number of batches") // Verify row content for i, expectedBatchRows := range tt.expectedRows { require.Equal(t, expectedBatchRows, actualRows[i], "batch %d content mismatch", i) } }) } } func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) { t.Run("invalid field name in schema", func(t *testing.T) { compat := &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, } // Create schema with invalid field name that cannot be parsed by semconv.ParseFQN schema := arrow.NewSchema([]arrow.Field{ {Name: "invalid-field-name", Type: types.Arrow.String, Nullable: true}, }, nil) input := NewArrowtestPipeline(schema, arrowtest.Rows{ {"invalid-field-name": "test"}, }) pipeline := newColumnCompatibilityPipeline(compat, input, nil) defer pipeline.Close() _, err := pipeline.Read(t.Context()) require.Error(t, err) // Should contain parsing error from semconv.ParseFQN }) t.Run("input pipeline error", func(t *testing.T) { compat := &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, } // Create a pipeline that will return an expected error expectedErr := errors.New("test error") input := errorPipeline(t.Context(), expectedErr) pipeline := newColumnCompatibilityPipeline(compat, input, nil) defer pipeline.Close() _, err := pipeline.Read(t.Context()) require.ErrorIs(t, err, expectedErr) }) t.Run("non-string collision column should panic", func(t *testing.T) { compat := &physical.ColumnCompat{ Collisions: []types.ColumnType{types.ColumnTypeLabel}, Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, } // Create schema where source column is not string type // This should trigger the panic in the switch statement for non-string columns schema := arrow.NewSchema([]arrow.Field{ semconv.FieldFromFQN("utf8.label.status", false), // collision column - string semconv.FieldFromFQN("int64.metadata.status", true), // source column - not string }, nil) input := NewArrowtestPipeline(schema, arrowtest.Rows{ {"utf8.label.status": "200", "int64.metadata.status": int64(200)}, }) pipeline := newColumnCompatibilityPipeline(compat, input, nil) defer pipeline.Close() // This should panic with "invalid column type: only string columns can be checked for collisions" require.Panics(t, func() { _, _ = pipeline.Read(t.Context()) }) }) }