From 5acb96abbd43b24dd86d829f32a5c0aeee4275eb Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 4 Feb 2026 11:57:08 -0700 Subject: [PATCH] ci: add drilldown and case insensitive queries to correctness tests (#20603) Signed-off-by: Trevor Whitney --- pkg/engine/internal/arrowagg/mapper.go | 13 +- pkg/engine/internal/executor/compat.go | 73 ++++++- pkg/engine/internal/executor/compat_test.go | 78 +++++++ pkg/engine/internal/executor/project.go | 41 ++-- pkg/engine/internal/executor/project_test.go | 195 +++++++++++++++++- .../planner/logical/logical_optimize.go | 2 +- .../planner/logical/logical_optimize_test.go | 35 ++++ pkg/logql/bench/generator_query.go | 38 +++- 8 files changed, 437 insertions(+), 38 deletions(-) diff --git a/pkg/engine/internal/arrowagg/mapper.go b/pkg/engine/internal/arrowagg/mapper.go index 06384fb462..3a40fc332c 100644 --- a/pkg/engine/internal/arrowagg/mapper.go +++ b/pkg/engine/internal/arrowagg/mapper.go @@ -81,14 +81,15 @@ func newMapping(schema *arrow.Schema, to []arrow.Field) *mapping { fieldIdxs := schema.FieldIndices(target.Name) if len(fieldIdxs) == 0 { continue - } else if len(fieldIdxs) > 1 { - // this should not occur as FQN should make field names unique. - panic("mapper: multiple fields with the same name in schema") } - // this check might be unnecessary given FQN uniqueness? - if schema.Field(fieldIdxs[0]).Equal(target) { - mapping.lookups[i] = fieldIdxs[0] + // Multiple parsers (e.g., logfmt and json) can create duplicate field names. + // Find the first field that matches the target exactly. + for _, idx := range fieldIdxs { + if schema.Field(idx).Equal(target) { + mapping.lookups[i] = idx + break + } } } diff --git a/pkg/engine/internal/executor/compat.go b/pkg/engine/internal/executor/compat.go index fa34bafea7..bfb2167cc9 100644 --- a/pkg/engine/internal/executor/compat.go +++ b/pkg/engine/internal/executor/compat.go @@ -87,9 +87,36 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin // Next, update the schema with the new columns that have the _extracted suffix. oldSchema := batch.Schema() + + destinationNames := make(map[string]bool, len(duplicates)) + for i := range duplicates { + destinationNames[duplicates[i].name+extracted] = true + } + // Copy old fields, but skip any existing _extracted columns that we're about to recreate newFields := make([]arrow.Field, 0, oldSchema.NumFields()+len(duplicates)) - newFields = append(newFields, oldSchema.Fields()...) - r := int(batch.NumCols()) + oldFieldToNewIdx := make(map[int]int, oldSchema.NumFields()) + existingDestCols := make(map[string]*array.String, len(duplicates)) + + for oldIdx, field := range oldSchema.Fields() { + ident, err := identCache.ParseFQN(field.Name) + if err != nil { + oldFieldToNewIdx[oldIdx] = len(newFields) + newFields = append(newFields, field) + continue + } + + // Skip existing _extracted columns that we're going to recreate + // But save a reference to their data so we can preserve values + if ident.ColumnType() == compat.Destination && destinationNames[ident.ShortName()] { + existingDestCols[ident.ShortName()] = batch.Column(oldIdx).(*array.String) + continue + } + + oldFieldToNewIdx[oldIdx] = len(newFields) + newFields = append(newFields, field) + } + + // Now add the new _extracted columns for i := range duplicates { sourceFieldIdx := duplicates[i].sourceIdx sourceField := oldSchema.Field(sourceFieldIdx) @@ -100,7 +127,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin destinationIdent := semconv.NewIdentifier(sourceIdent.ShortName()+extracted, compat.Destination, sourceIdent.DataType()) newFields = append(newFields, semconv.FieldFromIdent(destinationIdent, true)) - duplicates[i].destinationIdx = r + i + duplicates[i].destinationIdx = len(newFields) - 1 } // Create a new builder with the updated schema. @@ -114,14 +141,18 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin newSchemaColumns := make([]arrow.Array, newSchema.NumFields()) // Now, go through all fields of the old schema and append the rows to the new builder. - for idx := range schema.NumFields() { - col := batch.Column(idx) + for oldIdx := range oldSchema.NumFields() { + col := batch.Column(oldIdx) - duplicateIdx := slices.IndexFunc(duplicates, func(d duplicateColumn) bool { return d.sourceIdx == idx }) + duplicateIdx := slices.IndexFunc(duplicates, func(d duplicateColumn) bool { return d.sourceIdx == oldIdx }) // If not a colliding column, just copy over the column data of the original record. if duplicateIdx < 0 { - newSchemaColumns[idx] = col + // Check if this column should be copied (not a skipped _extracted column) + if newIdx, ok := oldFieldToNewIdx[oldIdx]; ok { + newSchemaColumns[newIdx] = col + } + // If not in the map, it was skipped (existing _extracted column) continue } @@ -134,11 +165,33 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin collisionCols[i] = batch.Column(collIdx) } - switch sourceFieldBuilder := builder.Field(idx).(type) { + // Get the new index for this source field + sourceNewIdx, ok := oldFieldToNewIdx[oldIdx] + if !ok { + // This shouldn't happen, but handle it gracefully + continue + } + + switch sourceFieldBuilder := builder.Field(sourceNewIdx).(type) { case *array.StringBuilder: destinationFieldBuilder := builder.Field(duplicate.destinationIdx).(*array.StringBuilder) + + // Check if there's an existing destination column (_extracted) in the input batch + // This happens when multiple ColumnCompat nodes run sequentially (e.g., | json | logfmt |) + existingDestCol := existingDestCols[duplicate.name+extracted] + for i := range int(batch.NumRows()) { - if col.IsNull(i) || !col.IsValid(i) { + // Preserve existing values over adding null + if existingDestCol != nil && !existingDestCol.IsNull(i) && existingDestCol.Value(i) != "" { + if col.IsNull(i) || !col.IsValid(i) { + sourceFieldBuilder.AppendNull() // append NULL to original column + } else { + sourceFieldBuilder.Append(col.(*array.String).Value(i)) // append value to original column + } + + existingVal := existingDestCol.Value(i) + destinationFieldBuilder.Append(existingVal) // append value to _extracted column + } else if col.IsNull(i) || !col.IsValid(i) { sourceFieldBuilder.AppendNull() // append NULL to original column destinationFieldBuilder.AppendNull() // append NULL to _extracted column } else if allColumnsNull(collisionCols, i) { @@ -154,7 +207,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin } sourceCol := sourceFieldBuilder.NewArray() - newSchemaColumns[duplicate.sourceIdx] = sourceCol + newSchemaColumns[sourceNewIdx] = sourceCol destinationCol := destinationFieldBuilder.NewArray() newSchemaColumns[duplicate.destinationIdx] = destinationCol diff --git a/pkg/engine/internal/executor/compat_test.go b/pkg/engine/internal/executor/compat_test.go index fd33779aca..e80986b2d8 100644 --- a/pkg/engine/internal/executor/compat_test.go +++ b/pkg/engine/internal/executor/compat_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/require" @@ -596,3 +597,80 @@ func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) { }) }) } + +// TestMultipleColumnCompatPreservesValues tests that when multiple ColumnCompat nodes run sequentially +// (as happens with multiple parsers like `| json | logfmt |`), the second ColumnCompat preserves +// values created by the first ColumnCompat instead of clobbering them. +func TestMultipleColumnCompatPreservesValues(t *testing.T) { + // Simulate a batch with mixed JSON and logfmt lines + // After JSON parser + ColumnCompat₁: + // - Row 0: level="info" from JSON, level_extracted="info" created + // - Row 1: level=NULL (logfmt line, JSON parsing failed), level_extracted=NULL + // + // After logfmt parser + ColumnCompat₂: + // - Row 0: level=NULL (JSON line, logfmt parsing failed), level_extracted should STAY "info" + // - Row 1: level="warn" from logfmt, level_extracted should be "warn" + + compat := &physical.ColumnCompat{ + Source: types.ColumnTypeParsed, + Destination: types.ColumnTypeParsed, + Collisions: []types.ColumnType{types.ColumnTypeMetadata}, + } + + // Step 1: Simulate state after JSON parser + ColumnCompat₁ + // Row 0: JSON line successfully parsed level="info", moved to level_extracted + // Row 1: logfmt line, JSON parsing failed, level=NULL, level_extracted=NULL + schemaAfterFirstCompat := arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false), + semconv.FieldFromFQN("utf8.metadata.level", true), // metadata level + semconv.FieldFromFQN("utf8.parsed.level", true), // parsed level (NULL for row 0, set by first ColumnCompat) + semconv.FieldFromFQN("utf8.parsed.level_extracted", true), // created by first ColumnCompat + }, nil) + + inputAfterFirstCompat := NewArrowtestPipeline(schemaAfterFirstCompat, arrowtest.Rows{ + { + "utf8.builtin.message": `{"level":"info","msg":"test"}`, + "timestamp_ns.builtin.timestamp": time.Unix(1000, 0).UTC(), + "utf8.metadata.level": "debug", // metadata level different from parsed + "utf8.parsed.level": nil, // NULL - was moved to level_extracted by first ColumnCompat + "utf8.parsed.level_extracted": "info", // created by first ColumnCompat + }, + { + "utf8.builtin.message": `level=warn msg="test"`, + "timestamp_ns.builtin.timestamp": time.Unix(1001, 0).UTC(), + "utf8.metadata.level": "debug", + "utf8.parsed.level": "warn", // NOW set by logfmt parser (simulated) + "utf8.parsed.level_extracted": nil, // Was NULL from first ColumnCompat + }, + }) + + // Step 2: Run second ColumnCompat (simulating logfmt parser's ColumnCompat) + pipeline := newColumnCompatibilityPipeline(compat, inputAfterFirstCompat, nil) + defer pipeline.Close() + + batch, err := pipeline.Read(t.Context()) + require.NoError(t, err) + require.NotNil(t, batch) + + levelExtractedIdx := -1 + for i := range batch.Schema().NumFields() { + if batch.Schema().Field(i).Name == "utf8.parsed.level_extracted" { + levelExtractedIdx = i + break + } + } + require.NotEqual(t, -1, levelExtractedIdx, "level_extracted column should exist") + + levelExtractedCol := batch.Column(levelExtractedIdx).(*array.String) + + // Row 0: Should preserve "info" from first ColumnCompat, NOT overwrite with NULL + require.False(t, levelExtractedCol.IsNull(0), "Row 0 level_extracted should not be NULL") + require.Equal(t, "info", levelExtractedCol.Value(0), + "Row 0 level_extracted should preserve 'info' from first ColumnCompat") + + // Row 1: Should have "warn" from second ColumnCompat + require.False(t, levelExtractedCol.IsNull(1), "Row 1 level_extracted should not be NULL") + require.Equal(t, "warn", levelExtractedCol.Value(1), + "Row 1 level_extracted should be 'warn' from second ColumnCompat") +} diff --git a/pkg/engine/internal/executor/project.go b/pkg/engine/internal/executor/project.go index b13026e4d0..35ac054744 100644 --- a/pkg/engine/internal/executor/project.go +++ b/pkg/engine/internal/executor/project.go @@ -176,11 +176,8 @@ func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, if idx := slices.IndexFunc(outputFields, func(f arrow.Field) bool { return f.Name == newField.Name }); idx != -1 { - if newField.Name == semconv.ColumnIdentError.FQN() || newField.Name == semconv.ColumnIdentErrorDetails.FQN() { - outputCols[idx] = mergeErrors(outputCols[idx].(*array.String), arrCasted.Field(i).(*array.String)) - } else { - panic(fmt.Sprintf("column duplicates %s", newField.Name)) - } + outputCols[idx] = mergeColumns(outputCols[idx], arrCasted.Field(i)) + outputFields[idx] = newField } else { outputCols = append(outputCols, arrCasted.Field(i)) outputFields = append(outputFields, newField) @@ -199,22 +196,32 @@ func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, }, region, input), nil } -// mergeErrors merges string columns into a semicolon separated list of values. -func mergeErrors(a, b *array.String) *array.String { +// mergeColumns merges two columns by preferring non-null and non-empty values from the new column (b). +// If b has a null or empty value at index i, keep the value from a at that index. +// If b has a non-null and non-empty value at index i, use the value from b (overwriting a). +func mergeColumns(a, b arrow.Array) arrow.Array { + // Only handle string arrays for now (which is what parsers produce) + aStr, aOk := a.(*array.String) + bStr, bOk := b.(*array.String) + + if !aOk || !bOk { + // If not both strings, just return b (overwrite behavior) + return b + } + builder := array.NewStringBuilder(memory.DefaultAllocator) - builder.Reserve(a.Len()) - - for i := range a.Len() { - aVal := a.Value(i) - bVal := b.Value(i) - if bVal != "" { - if aVal != "" { - builder.Append(fmt.Sprintf("%s; %s", aVal, bVal)) + builder.Reserve(aStr.Len()) + + for i := range aStr.Len() { + if bStr.IsNull(i) || bStr.Value(i) == "" { + // New value is null or empty, keep old value + if aStr.IsNull(i) { + builder.AppendNull() } else { - builder.Append(bVal) + builder.Append(aStr.Value(i)) } } else { - builder.Append(aVal) + builder.Append(bStr.Value(i)) } } diff --git a/pkg/engine/internal/executor/project_test.go b/pkg/engine/internal/executor/project_test.go index f87fd7ea21..a5a2d055bb 100644 --- a/pkg/engine/internal/executor/project_test.go +++ b/pkg/engine/internal/executor/project_test.go @@ -393,7 +393,7 @@ func TestNewProjectPipeline_ProjectionFunction_ExpandWithCast(t *testing.T) { }, }, { - name: "existing error columns", + name: "existing error columns - last error wins", schema: arrow.NewSchema([]arrow.Field{ semconv.FieldFromIdent(semconv.ColumnIdentMessage, false), semconv.FieldFromFQN("utf8.parsed.mixed_values", true), @@ -418,8 +418,8 @@ func TestNewProjectPipeline_ProjectionFunction_ExpandWithCast(t *testing.T) { "utf8.generated.__error_details__": ""}, {"utf8.builtin.message": "invalid numeric", "utf8.parsed.mixed_values": "not_a_number", "float64.generated.value": 0.0, - "utf8.generated.__error__": "My error; SampleExtractionErr", - "utf8.generated.__error_details__": `Some error; strconv.ParseFloat: parsing "not_a_number": invalid syntax`}, + "utf8.generated.__error__": types.SampleExtractionErrorType, + "utf8.generated.__error_details__": `strconv.ParseFloat: parsing "not_a_number": invalid syntax`}, }, }, { @@ -762,3 +762,192 @@ func createAmbiguousColumnRef(name string) types.ColumnRef { Type: types.ColumnTypeAmbiguous, } } + +func TestNewProjectPipeline_DuplicateColumnPanic(t *testing.T) { + t.Run("duplicate columns from mixed json and logfmt parsers", func(t *testing.T) { + schema := arrow.NewSchema([]arrow.Field{ + semconv.FieldFromIdent(semconv.ColumnIdentMessage, false), + semconv.FieldFromFQN("utf8.parsed.status", true), + }, nil) + + // Simulate output from JSON parser that extracted "status" field + rows := arrowtest.Rows{ + // Row 1: JSON line that was parsed and extracted "status" + { + "utf8.builtin.message": `{"level":"info","status":200}`, + "utf8.parsed.status": "200", + }, + // Row 2: Logfmt line that will also parse and extract "status" + { + "utf8.builtin.message": `level=info status=404 method=GET`, + "utf8.parsed.status": nil, + semconv.ColumnIdentError.FQN(): "error message", + semconv.ColumnIdentErrorDetails.FQN(): "error details", + }, + // Row 3: Another JSON line with status + { + "utf8.builtin.message": `{"level":"error","status":500}`, + "utf8.parsed.status": "500", + }, + // Row 4: Logfmt line that will not parse and extract "status" + { + "utf8.builtin.message": `level=info method=GET`, + "utf8.parsed.status": nil, + semconv.ColumnIdentError.FQN(): "error message", + semconv.ColumnIdentErrorDetails.FQN(): "error details", + }, + } + + input := NewArrowtestPipeline(schema, rows) + + // Create an expression that parses logfmt from message, which will also extract "status" + // This simulates a query like: | json | logfmt + // where both parsers extract the same field name + evaluator := newExpressionEvaluator() + + parseExpr := &physical.VariadicExpr{ + Op: types.VariadicOpParseLogfmt, + Expressions: []physical.Expression{ + &physical.ColumnExpr{ + Ref: types.ColumnRef{ + Column: "message", + Type: types.ColumnTypeBuiltin, + }, + }, + physical.NewLiteral([]string{}), + physical.NewLiteral(false), + physical.NewLiteral(false), + }, + } + + proj := &physical.Projection{ + Expressions: []physical.Expression{parseExpr}, + All: true, + Expand: true, + } + + pipeline, err := NewProjectPipeline(input, proj, evaluator, nil) + require.NoError(t, err) + + ctx := t.Context() + record, err := pipeline.Read(ctx) + require.NoError(t, err) + + expectedRows := arrowtest.Rows{ + // Row 1: JSON parsed status="200", logfmt fails (null) → keep "200" + { + "utf8.builtin.message": `{"level":"info","status":200}`, + "utf8.generated.__error__": "LogfmtParserErr", + "utf8.generated.__error_details__": "logfmt syntax error at pos 2 : unexpected '\"'", + "utf8.parsed.level": nil, // logfmt didn't parse, so null + "utf8.parsed.method": nil, + "utf8.parsed.status": "200", // Kept from original JSON parse + }, + // Row 2: No previous status (null), logfmt parses status="404" → use "404" + { + "utf8.builtin.message": `level=info status=404 method=GET`, + "utf8.generated.__error__": "", + "utf8.generated.__error_details__": "", + "utf8.parsed.level": "info", + "utf8.parsed.method": "GET", + "utf8.parsed.status": "404", + }, + // Row 3: JSON parsed status="500", logfmt fails (null) → keep "500" + { + "utf8.builtin.message": `{"level":"error","status":500}`, + "utf8.generated.__error__": "LogfmtParserErr", + "utf8.generated.__error_details__": "logfmt syntax error at pos 2 : unexpected '\"'", + "utf8.parsed.level": nil, // logfmt didn't parse, so null + "utf8.parsed.method": nil, + "utf8.parsed.status": "500", // Kept from original JSON parse + }, + // Row 4: No previous status (null), logfmt also does't parse status + { + "utf8.builtin.message": `level=info method=GET`, + "utf8.generated.__error__": "", + "utf8.generated.__error_details__": "", + "utf8.parsed.level": "info", + "utf8.parsed.method": "GET", + "utf8.parsed.status": nil, + }, + } + + actualRows, err := arrowtest.RecordRows(record) + require.NoError(t, err) + require.Equal(t, expectedRows, actualRows) + }) + + t.Run("duplicate columns from same parser multiple times", func(t *testing.T) { + // Another scenario: a field already exists and gets parsed again + schema := arrow.NewSchema([]arrow.Field{ + semconv.FieldFromIdent(semconv.ColumnIdentMessage, false), + semconv.FieldFromFQN("utf8.parsed.user", true), + semconv.FieldFromFQN("utf8.parsed.action", true), + }, nil) + + rows := arrowtest.Rows{ + { + "utf8.builtin.message": `{"user":"alice","action":"login","timestamp":"2024-01-01"}`, + "utf8.parsed.user": "bob", + "utf8.parsed.action": "logout", + }, + { + "utf8.builtin.message": `{"action":"login","timestamp":"2024-01-01"}`, + "utf8.parsed.user": "bob", + "utf8.parsed.action": "logout", + }, + } + + input := NewArrowtestPipeline(schema, rows) + + evaluator := newExpressionEvaluator() + + // Parse JSON from message which will extract "user" and "action" again + parseExpr := &physical.VariadicExpr{ + Op: types.VariadicOpParseJSON, + Expressions: []physical.Expression{ + &physical.ColumnExpr{ + Ref: types.ColumnRef{ + Column: "message", + Type: types.ColumnTypeBuiltin, + }, + }, + physical.NewLiteral([]string{}), + physical.NewLiteral(false), + physical.NewLiteral(false), + }, + } + + proj := &physical.Projection{ + Expressions: []physical.Expression{parseExpr}, + All: true, + Expand: true, + } + + pipeline, err := NewProjectPipeline(input, proj, evaluator, nil) + require.NoError(t, err) + + ctx := t.Context() + record, err := pipeline.Read(ctx) + require.NoError(t, err) + + expectedRows := arrowtest.Rows{ + { + "utf8.builtin.message": `{"user":"alice","action":"login","timestamp":"2024-01-01"}`, + "utf8.parsed.action": "login", + "utf8.parsed.timestamp": "2024-01-01", + "utf8.parsed.user": "alice", + }, + { + "utf8.builtin.message": `{"action":"login","timestamp":"2024-01-01"}`, + "utf8.parsed.action": "login", + "utf8.parsed.timestamp": "2024-01-01", + "utf8.parsed.user": "bob", //from first parse + }, + } + + actualRows, err := arrowtest.RecordRows(record) + require.NoError(t, err) + require.Equal(t, expectedRows, actualRows) + }) +} diff --git a/pkg/engine/internal/planner/logical/logical_optimize.go b/pkg/engine/internal/planner/logical/logical_optimize.go index 071782c621..6e4d952d1b 100644 --- a/pkg/engine/internal/planner/logical/logical_optimize.go +++ b/pkg/engine/internal/planner/logical/logical_optimize.go @@ -194,7 +194,7 @@ func (pass simplifyRegexPass) simplifyBinop(b *BinOp) (simplified []Node, change nodes, changed := pass.simplifyRegex(b.Left, isMessage, reg.Simplify()) - if b.Op == types.BinaryOpNotMatchRe { + if changed && b.Op == types.BinaryOpNotMatchRe { // Add a final instruction to invert the match. nodes = append(nodes, &UnaryOp{ Op: types.UnaryOpNot, diff --git a/pkg/engine/internal/planner/logical/logical_optimize_test.go b/pkg/engine/internal/planner/logical/logical_optimize_test.go index 9408bb131e..e00442ee14 100644 --- a/pkg/engine/internal/planner/logical/logical_optimize_test.go +++ b/pkg/engine/internal/planner/logical/logical_optimize_test.go @@ -146,6 +146,41 @@ RETURN %13 require.Equal(t, expect, actual, "Actual plan:\n%s", actual) } +func Test_simplifyRegexPass_CaseInsensitiveNegate(t *testing.T) { + params, err := logql.NewLiteralParams( + `{region="ap-southeast-1"} !~ "(?i)debug"`, + time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC), + time.Date(2025, time.January, 2, 0, 0, 0, 0, time.UTC), + 0 /* step */, 0, /* duration */ + logproto.BACKWARD, + 1000, + []string{"0_of_1"}, + nil, + ) + require.NoError(t, err) + + expect := strings.TrimSpace(` +%1 = EQ label.region "ap-southeast-1" +%2 = NOT_MATCH_RE builtin.message "(?i)debug" +%3 = MAKETABLE [selector=%1, predicates=[%2], shard=0_of_1] +%4 = GTE builtin.timestamp 2025-01-01T00:00:00Z +%5 = SELECT %3 [predicate=%4] +%6 = LT builtin.timestamp 2025-01-02T00:00:00Z +%7 = SELECT %5 [predicate=%6] +%8 = SELECT %7 [predicate=%2] +%9 = TOPK %8 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false] +%10 = LOGQL_COMPAT %9 +RETURN %10 +`) + + p, err := BuildPlan(context.Background(), params) + require.NoError(t, err) + require.NoError(t, Optimize(p), "optimization should not panic on case-insensitive negated regex") + + actual := strings.TrimSpace(p.String()) + require.Equal(t, expect, actual, "Actual plan:\n%s", actual) +} + //go:embed testdata/simplifyRegexPass/*.txtar var simplifyRegexTests embed.FS diff --git a/pkg/logql/bench/generator_query.go b/pkg/logql/bench/generator_query.go index 1988219fda..0de10ce14f 100644 --- a/pkg/logql/bench/generator_query.go +++ b/pkg/logql/bench/generator_query.go @@ -69,6 +69,23 @@ func (c *GeneratorConfig) buildLabelSelector(matchers []labelMatcher) string { return "{" + strings.Join(parts, ", ") + "}" } +func (c *GeneratorConfig) buildRegexLabelSelector(matchers []labelMatcher, caseInsensitive bool) string { + var parts []string + for i, m := range matchers { + if i == 0 { + if caseInsensitive { + parts = append(parts, fmt.Sprintf(`%s=~"(?i)%s"`, m.name, m.value)) + } else { + parts = append(parts, fmt.Sprintf(`%s=~"%s"`, m.name, m.value)) + } + continue + } + + parts = append(parts, fmt.Sprintf(`%s="%s"`, m.name, m.value)) + } + return "{" + strings.Join(parts, ", ") + "}" +} + func (c *GeneratorConfig) generateLabelCombinations() [][]labelMatcher { rnd := c.NewRand() @@ -185,8 +202,13 @@ func (g *TestCaseGenerator) Generate() []TestCase { // Basic label selector queries with line filters and structured metadata if g.cfg.RangeType != "instant" { // log queries only support range type - for _, combo := range labelCombos { + for i, combo := range labelCombos { selector := g.logGenCfg.buildLabelSelector(combo) + if i%3 == 0 { + selector = g.logGenCfg.buildRegexLabelSelector(combo, true) + } else if i%2 == 0 { + selector = g.logGenCfg.buildRegexLabelSelector(combo, false) + } // Basic selector addBidirectional(selector, g.logGenCfg.StartTime, end) @@ -202,6 +224,16 @@ func (g *TestCaseGenerator) Generate() []TestCase { addBidirectional(selector+` |~ "error|exception" | detected_level="error"`, g.logGenCfg.StartTime, end) addBidirectional(selector+` | json | duration_seconds > 0.1 | detected_level!="debug"`, g.logGenCfg.StartTime, end) addBidirectional(selector+` | logfmt | level="error" | detected_level="error"`, g.logGenCfg.StartTime, end) + + // Logs Drilldown queries -- common patterns seen from drilldown + addBidirectional(selector+` |~ "(?i)error"`, g.logGenCfg.StartTime, end) + addBidirectional(selector+` !~ "(?i)debug"`, g.logGenCfg.StartTime, end) + addBidirectional(selector+` | json | logfmt | drop __error__, __error_details__`, g.logGenCfg.StartTime, end) + addBidirectional(selector+` | json | logfmt | drop __error__, __error_details__ | level="error"`, g.logGenCfg.StartTime, end) + addBidirectional(selector+` | json | logfmt | drop __error__, __error_details__ | level="error" or level="warn"`, g.logGenCfg.StartTime, end) + addBidirectional(selector+` |~ "(?i)error" | json | logfmt | drop __error__, __error_details__ | status_code >= 500`, g.logGenCfg.StartTime, end) + addBidirectional(selector+` | detected_level="error" |~ "(?i).*timeout.*"`, g.logGenCfg.StartTime, end) + addBidirectional(selector+` | detected_level=~"error|warn" |~ "(?i)exception"`, g.logGenCfg.StartTime, end) } } @@ -258,6 +290,10 @@ func (g *TestCaseGenerator) Generate() []TestCase { addMetricQuery(fmt.Sprintf(`avg_over_time({service_name="loki"} | logfmt | duration != "" | unwrap duration_seconds(duration) [%s])`, rangeInterval), start, end, step) addMetricQuery(fmt.Sprintf(`sum_over_time({service_name="database"} | json | unwrap rows_affected [%s])`, rangeInterval), start, end, step) + // Logs Drilldown style unwrap query (from fields tab) + addMetricQuery(fmt.Sprintf(`sum by (detected_level) (avg_over_time({service_name="loki"} | json | logfmt | duration != "" | drop __error__, __error_details__ | unwrap duration_seconds(duration) [%s]))`, rangeInterval), start, end, step) + addMetricQuery(fmt.Sprintf(`sum by (detected_level) (count_over_time({service_name=~"(?i)loki"} | detected_level="debug" or detected_level="info" or detected_level="warn" |~ "(?i)(?i)duration" | json | logfmt | drop __error__, __error_details__ | level=~"(?i)INFO" [%s]))`, rangeInterval), start, end, step) + // Dense period queries for _, interval := range g.logGenCfg.DenseIntervals { combo := labelCombos[g.logGenCfg.NewRand().Intn(len(labelCombos))]