mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
598 lines
22 KiB
598 lines
22 KiB
package executor
|
|
|
|
import (
|
|
"errors"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/util/arrowtest"
|
|
)
|
|
|
|
func TestNewColumnCompatibilityPipeline(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
compat *physical.ColumnCompat
|
|
schema *arrow.Schema
|
|
inputRows []arrowtest.Rows
|
|
expectedSchema *arrow.Schema
|
|
expectedRows []arrowtest.Rows
|
|
expectError bool
|
|
errorContains string
|
|
}{
|
|
{
|
|
name: "no column collisions - returns early",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false),
|
|
semconv.FieldFromFQN("utf8.label.service", true),
|
|
semconv.FieldFromFQN("utf8.metadata.detected_level", true),
|
|
semconv.FieldFromFQN("utf8.metadata.org_id", true),
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.builtin.message": "test message", "timestamp_ns.builtin.timestamp": time.Unix(1000, 0).UTC(), "utf8.label.service": "api", "utf8.metadata.detected_level": "info", "utf8.metadata.org_id": "1"},
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false),
|
|
semconv.FieldFromFQN("utf8.label.service", true),
|
|
semconv.FieldFromFQN("utf8.metadata.detected_level", true),
|
|
semconv.FieldFromFQN("utf8.metadata.org_id", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.builtin.message": "test message", "timestamp_ns.builtin.timestamp": time.Unix(1000, 0).UTC(), "utf8.label.service": "api", "utf8.metadata.detected_level": "info", "utf8.metadata.org_id": "1"},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "single column collision - string type",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("utf8.label.env", true),
|
|
semconv.FieldFromFQN("utf8.label.status", true), // collision column
|
|
semconv.FieldFromFQN("utf8.metadata.status", true), // source column with same name
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.builtin.message": "line 1", "utf8.label.env": "dev", "utf8.label.status": "success", "utf8.metadata.status": "200"},
|
|
{"utf8.builtin.message": "line 2", "utf8.label.env": "dev", "utf8.label.status": "failure", "utf8.metadata.status": "500"},
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("utf8.label.env", true),
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status_extracted", true), // new extracted column
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.builtin.message": "line 1", "utf8.label.env": "dev", "utf8.label.status": "success", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200"},
|
|
{"utf8.builtin.message": "line 2", "utf8.label.env": "dev", "utf8.label.status": "failure", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "500"},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "multiple column collisions",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("utf8.label.status", true), // collision column 1
|
|
semconv.FieldFromFQN("utf8.label.level", true), // collision column 2
|
|
semconv.FieldFromFQN("utf8.metadata.status", true), // source column 1
|
|
semconv.FieldFromFQN("utf8.metadata.level", true), // source column 2
|
|
semconv.FieldFromFQN("utf8.metadata.unique", true), // non-colliding source column
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{
|
|
"utf8.builtin.message": "test message",
|
|
"utf8.label.status": "active",
|
|
"utf8.label.level": "prod",
|
|
"utf8.metadata.status": "200",
|
|
"utf8.metadata.level": "info",
|
|
"utf8.metadata.unique": "value",
|
|
},
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.label.level", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.level", true),
|
|
semconv.FieldFromFQN("utf8.metadata.unique", true),
|
|
semconv.FieldFromFQN("utf8.metadata.level_extracted", true), // sorted by name: level comes before status
|
|
semconv.FieldFromFQN("utf8.metadata.status_extracted", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{
|
|
"utf8.builtin.message": "test message",
|
|
"utf8.label.status": "active",
|
|
"utf8.label.level": "prod",
|
|
"utf8.metadata.status": nil, // nullified due to collision
|
|
"utf8.metadata.level": nil, // nullified due to collision
|
|
"utf8.metadata.unique": "value", // preserved as no collision
|
|
"utf8.metadata.level_extracted": "info",
|
|
"utf8.metadata.status_extracted": "200",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "collision with null values in collision column",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.label.status": nil, "utf8.metadata.status": "200"}, // null collision column
|
|
{"utf8.label.status": "active", "utf8.metadata.status": nil}, // null source column
|
|
{"utf8.label.status": nil, "utf8.metadata.status": nil}, // both null
|
|
{"utf8.label.status": "active", "utf8.metadata.status": "200"}, // both non-null
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status_extracted", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.label.status": nil, "utf8.metadata.status": "200", "utf8.metadata.status_extracted": nil}, // null collision -> null extraction
|
|
{"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, // null source -> null extraction
|
|
{"utf8.label.status": nil, "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil}, // both null -> null extraction
|
|
{"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200"}, // both non-null -> extract
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "collision with null values in source column",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.label.status": "active", "utf8.metadata.status": nil}, // null source column
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status_extracted", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": nil},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "multiple batches with collisions",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.label.status": "active", "utf8.metadata.status": "200"},
|
|
{"utf8.label.status": "inactive", "utf8.metadata.status": "404"},
|
|
},
|
|
{
|
|
{"utf8.label.status": "pending", "utf8.metadata.status": "202"},
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status_extracted", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{"utf8.label.status": "active", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "200"},
|
|
{"utf8.label.status": "inactive", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "404"},
|
|
},
|
|
{
|
|
{"utf8.label.status": "pending", "utf8.metadata.status": nil, "utf8.metadata.status_extracted": "202"},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "empty batch does not add _extracted column",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{}, // empty batch
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{}, // empty result
|
|
},
|
|
},
|
|
{
|
|
name: "non-string column types - should copy through unchanged",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false),
|
|
semconv.FieldFromFQN("float64.builtin.value", false),
|
|
semconv.FieldFromFQN("int64.builtin.count", false),
|
|
semconv.FieldFromFQN("utf8.label.status", true), // collision column
|
|
semconv.FieldFromFQN("utf8.metadata.status", true), // source column
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{
|
|
"utf8.builtin.message": "test message",
|
|
"timestamp_ns.builtin.timestamp": time.Unix(1000000, 0).UTC(),
|
|
"float64.builtin.value": 3.14,
|
|
"int64.builtin.count": int64(42),
|
|
"utf8.label.status": "active",
|
|
"utf8.metadata.status": "200",
|
|
},
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false),
|
|
semconv.FieldFromFQN("float64.builtin.value", false),
|
|
semconv.FieldFromFQN("int64.builtin.count", false),
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status_extracted", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{
|
|
"utf8.builtin.message": "test message",
|
|
"timestamp_ns.builtin.timestamp": time.Unix(1000000, 0).UTC(),
|
|
"float64.builtin.value": 3.14,
|
|
"int64.builtin.count": int64(42),
|
|
"utf8.label.status": "active",
|
|
"utf8.metadata.status": nil,
|
|
"utf8.metadata.status_extracted": "200",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "multiple collision types - label and metadata",
|
|
compat: &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel, types.ColumnTypeMetadata},
|
|
Source: types.ColumnTypeParsed,
|
|
Destination: types.ColumnTypeParsed,
|
|
},
|
|
schema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
// Collision columns from Label type
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.label.level", true),
|
|
// Collision columns from Metadata type
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.env", true),
|
|
// Source columns (Parsed) that collide with both types
|
|
semconv.FieldFromFQN("utf8.parsed.status", true), // collides with both label.status and metadata.status
|
|
semconv.FieldFromFQN("utf8.parsed.level", true), // collides with label.level
|
|
semconv.FieldFromFQN("utf8.parsed.env", true), // collides with metadata.env
|
|
semconv.FieldFromFQN("utf8.parsed.unique", true), // no collision
|
|
}, nil),
|
|
inputRows: []arrowtest.Rows{
|
|
{
|
|
{
|
|
"utf8.builtin.message": "test message",
|
|
"utf8.label.status": "active",
|
|
"utf8.label.level": "debug",
|
|
"utf8.metadata.status": "200",
|
|
"utf8.metadata.env": "production",
|
|
"utf8.parsed.status": "ok",
|
|
"utf8.parsed.level": "info",
|
|
"utf8.parsed.env": "staging",
|
|
"utf8.parsed.unique": "value",
|
|
},
|
|
{
|
|
"utf8.builtin.message": "test message 2",
|
|
"utf8.label.status": "inactive",
|
|
"utf8.label.level": "debug",
|
|
"utf8.metadata.status": "404",
|
|
"utf8.metadata.env": "development",
|
|
"utf8.parsed.status": "error",
|
|
"utf8.parsed.level": "debug",
|
|
"utf8.parsed.env": "local",
|
|
"utf8.parsed.unique": "another",
|
|
},
|
|
// no duplicates as collision columns are null
|
|
{
|
|
"utf8.builtin.message": "test message 3",
|
|
"utf8.label.status": nil,
|
|
"utf8.label.level": nil,
|
|
"utf8.metadata.status": nil,
|
|
"utf8.metadata.env": nil,
|
|
"utf8.parsed.status": "error",
|
|
"utf8.parsed.level": "debug",
|
|
"utf8.parsed.env": "local",
|
|
"utf8.parsed.unique": "another",
|
|
},
|
|
{
|
|
"utf8.builtin.message": "test message 4",
|
|
"utf8.label.status": "inactive",
|
|
"utf8.label.level": "debug",
|
|
"utf8.metadata.status": nil,
|
|
"utf8.metadata.env": nil,
|
|
"utf8.parsed.status": "error",
|
|
"utf8.parsed.level": "info",
|
|
"utf8.parsed.env": "local",
|
|
"utf8.parsed.unique": "another",
|
|
},
|
|
},
|
|
},
|
|
expectedSchema: arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.builtin.message", true),
|
|
semconv.FieldFromFQN("utf8.label.status", true),
|
|
semconv.FieldFromFQN("utf8.label.level", true),
|
|
semconv.FieldFromFQN("utf8.metadata.status", true),
|
|
semconv.FieldFromFQN("utf8.metadata.env", true),
|
|
semconv.FieldFromFQN("utf8.parsed.status", true),
|
|
semconv.FieldFromFQN("utf8.parsed.level", true),
|
|
semconv.FieldFromFQN("utf8.parsed.env", true),
|
|
semconv.FieldFromFQN("utf8.parsed.unique", true),
|
|
// Extracted columns (sorted by name)
|
|
semconv.FieldFromFQN("utf8.parsed.env_extracted", true),
|
|
semconv.FieldFromFQN("utf8.parsed.level_extracted", true),
|
|
semconv.FieldFromFQN("utf8.parsed.status_extracted", true),
|
|
}, nil),
|
|
expectedRows: []arrowtest.Rows{
|
|
{
|
|
{
|
|
"utf8.builtin.message": "test message",
|
|
"utf8.label.status": "active",
|
|
"utf8.label.level": "debug",
|
|
"utf8.metadata.status": "200",
|
|
"utf8.metadata.env": "production",
|
|
"utf8.parsed.status": nil,
|
|
"utf8.parsed.level": nil,
|
|
"utf8.parsed.env": nil,
|
|
"utf8.parsed.unique": "value",
|
|
"utf8.parsed.env_extracted": "staging",
|
|
"utf8.parsed.level_extracted": "info",
|
|
"utf8.parsed.status_extracted": "ok",
|
|
},
|
|
{
|
|
"utf8.builtin.message": "test message 2",
|
|
"utf8.label.status": "inactive",
|
|
"utf8.label.level": "debug",
|
|
"utf8.metadata.status": "404",
|
|
"utf8.metadata.env": "development",
|
|
"utf8.parsed.status": nil,
|
|
"utf8.parsed.level": nil,
|
|
"utf8.parsed.env": nil,
|
|
"utf8.parsed.unique": "another",
|
|
"utf8.parsed.env_extracted": "local",
|
|
"utf8.parsed.level_extracted": "debug",
|
|
"utf8.parsed.status_extracted": "error",
|
|
},
|
|
{
|
|
"utf8.builtin.message": "test message 3",
|
|
"utf8.label.status": nil,
|
|
"utf8.label.level": nil,
|
|
"utf8.metadata.status": nil,
|
|
"utf8.metadata.env": nil,
|
|
"utf8.parsed.status": "error",
|
|
"utf8.parsed.level": "debug",
|
|
"utf8.parsed.env": "local",
|
|
"utf8.parsed.unique": "another",
|
|
"utf8.parsed.env_extracted": nil,
|
|
"utf8.parsed.level_extracted": nil,
|
|
"utf8.parsed.status_extracted": nil,
|
|
},
|
|
{
|
|
"utf8.builtin.message": "test message 4",
|
|
"utf8.label.status": "inactive",
|
|
"utf8.label.level": "debug",
|
|
"utf8.metadata.status": nil,
|
|
"utf8.metadata.env": nil,
|
|
"utf8.parsed.status": nil,
|
|
"utf8.parsed.level": nil,
|
|
"utf8.parsed.env": "local",
|
|
"utf8.parsed.unique": "another",
|
|
"utf8.parsed.env_extracted": nil,
|
|
"utf8.parsed.level_extracted": "info",
|
|
"utf8.parsed.status_extracted": "error",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
// Create input pipeline
|
|
var input Pipeline
|
|
if len(tt.inputRows) == 0 {
|
|
input = emptyPipeline()
|
|
} else if len(tt.inputRows) == 1 {
|
|
input = NewArrowtestPipeline(tt.schema, tt.inputRows[0])
|
|
} else {
|
|
// Multiple batches
|
|
var records []arrow.RecordBatch
|
|
for _, rows := range tt.inputRows {
|
|
record := rows.Record(memory.DefaultAllocator, tt.schema)
|
|
records = append(records, record)
|
|
}
|
|
input = NewBufferedPipeline(records...)
|
|
}
|
|
|
|
// Create compatibility pipeline
|
|
pipeline := newColumnCompatibilityPipeline(tt.compat, input, nil)
|
|
defer pipeline.Close()
|
|
|
|
if tt.expectError {
|
|
// Test error case
|
|
_, err := pipeline.Read(t.Context())
|
|
require.Error(t, err)
|
|
if tt.errorContains != "" {
|
|
require.ErrorContains(t, err, tt.errorContains)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Test successful cases
|
|
var actualRows []arrowtest.Rows
|
|
batchCount := 0
|
|
|
|
for {
|
|
record, err := pipeline.Read(t.Context())
|
|
if err == EOF {
|
|
break
|
|
}
|
|
require.NoError(t, err)
|
|
|
|
// Verify schema matches expected (only check first batch)
|
|
if batchCount == 0 && tt.expectedSchema != nil {
|
|
require.True(t, tt.expectedSchema.Equal(record.Schema()),
|
|
"Schema mismatch.\nExpected: %s\nActual: %s",
|
|
tt.expectedSchema, record.Schema())
|
|
}
|
|
|
|
// Convert record to rows for comparison
|
|
rows, err := arrowtest.RecordRows(record)
|
|
require.NoError(t, err)
|
|
actualRows = append(actualRows, rows)
|
|
batchCount++
|
|
}
|
|
|
|
// Verify expected number of batches
|
|
require.Len(t, actualRows, len(tt.expectedRows), "unexpected number of batches")
|
|
|
|
// Verify row content
|
|
for i, expectedBatchRows := range tt.expectedRows {
|
|
require.Equal(t, expectedBatchRows, actualRows[i], "batch %d content mismatch", i)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) {
|
|
t.Run("invalid field name in schema", func(t *testing.T) {
|
|
compat := &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
}
|
|
|
|
// Create schema with invalid field name that cannot be parsed by semconv.ParseFQN
|
|
schema := arrow.NewSchema([]arrow.Field{
|
|
{Name: "invalid-field-name", Type: types.Arrow.String, Nullable: true},
|
|
}, nil)
|
|
|
|
input := NewArrowtestPipeline(schema, arrowtest.Rows{
|
|
{"invalid-field-name": "test"},
|
|
})
|
|
|
|
pipeline := newColumnCompatibilityPipeline(compat, input, nil)
|
|
defer pipeline.Close()
|
|
|
|
_, err := pipeline.Read(t.Context())
|
|
require.Error(t, err)
|
|
// Should contain parsing error from semconv.ParseFQN
|
|
})
|
|
|
|
t.Run("input pipeline error", func(t *testing.T) {
|
|
compat := &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
}
|
|
|
|
// Create a pipeline that will return an expected error
|
|
expectedErr := errors.New("test error")
|
|
input := errorPipeline(t.Context(), expectedErr)
|
|
|
|
pipeline := newColumnCompatibilityPipeline(compat, input, nil)
|
|
defer pipeline.Close()
|
|
|
|
_, err := pipeline.Read(t.Context())
|
|
require.ErrorIs(t, err, expectedErr)
|
|
})
|
|
|
|
t.Run("non-string collision column should panic", func(t *testing.T) {
|
|
compat := &physical.ColumnCompat{
|
|
Collisions: []types.ColumnType{types.ColumnTypeLabel},
|
|
Source: types.ColumnTypeMetadata,
|
|
Destination: types.ColumnTypeMetadata,
|
|
}
|
|
|
|
// Create schema where source column is not string type
|
|
// This should trigger the panic in the switch statement for non-string columns
|
|
schema := arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromFQN("utf8.label.status", false), // collision column - string
|
|
semconv.FieldFromFQN("int64.metadata.status", true), // source column - not string
|
|
}, nil)
|
|
|
|
input := NewArrowtestPipeline(schema, arrowtest.Rows{
|
|
{"utf8.label.status": "200", "int64.metadata.status": int64(200)},
|
|
})
|
|
|
|
pipeline := newColumnCompatibilityPipeline(compat, input, nil)
|
|
defer pipeline.Close()
|
|
|
|
// This should panic with "invalid column type: only string columns can be checked for collisions"
|
|
require.Panics(t, func() {
|
|
_, _ = pipeline.Read(t.Context())
|
|
})
|
|
})
|
|
}
|
|
|