Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/compat_test.go

995 lines
38 KiB

package engine
import (
"context"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
"github.com/grafana/loki/pkg/push"
)
func TestStreamsResultBuilder(t *testing.T) {
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
md, _ := metadata.NewContext(t.Context())
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
})
t.Run("rows without timestamp, or labels are ignored", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, true), // in practice timestamp is not nullable!
semconv.FieldFromIdent(colMsg, true), // in practice message is not nullable!
semconv.FieldFromIdent(colEnv, true),
},
nil,
)
rows := arrowtest.Rows{
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): nil,
colEnv.FQN(): "prod",
},
{
colTs.FQN(): nil,
colMsg.FQN(): "log line",
colEnv.FQN(): "prod",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): nil,
},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 1, builder.Len(), "expected 1 entry to be collected")
})
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colNs := semconv.NewIdentifier("namespace", types.ColumnTypeLabel, types.Loki.String)
colTid := semconv.NewIdentifier("traceID", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, true),
semconv.FieldFromIdent(colNs, true),
semconv.FieldFromIdent(colTid, true),
},
nil,
)
rows := arrowtest.Rows{
{
colTs.FQN(): nil,
colMsg.FQN(): "log line 0 (must be skipped)",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-001",
colTid.FQN(): "860e403fcf754312",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): "log line 1",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-001",
colTid.FQN(): "860e403fcf754312",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
colMsg.FQN(): "log line 2",
colEnv.FQN(): "prod",
colNs.FQN(): "loki-prod-001",
colTid.FQN(): "46ce02549441e41c",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line 3",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-002",
colTid.FQN(): "61330481e1e59b18",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(),
colMsg.FQN(): "log line 4",
colEnv.FQN(): "prod",
colNs.FQN(): "loki-prod-001",
colTid.FQN(): "40e50221e284b9d2",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000005).UTC(),
colMsg.FQN(): "log line 5",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-002",
colTid.FQN(): "0cf883f112ad239b",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000006).UTC(),
colMsg.FQN(): "log line 6",
colEnv.FQN(): "dev",
colNs.FQN(): nil,
colTid.FQN(): "9de325g124ad230b",
},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 6, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
require.Equal(t, 6, result.Data.(logqlmodel.Streams).Len())
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-001", "traceID", "860e403fcf754312").String(),
Entries: []logproto.Entry{
{Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "860e403fcf754312")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-002", "traceID", "0cf883f112ad239b").String(),
Entries: []logproto.Entry{
{Line: "log line 5", Timestamp: time.Unix(0, 1620000000000000005), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "0cf883f112ad239b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-002", "traceID", "61330481e1e59b18").String(),
Entries: []logproto.Entry{
{Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "61330481e1e59b18")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "dev", "traceID", "9de325g124ad230b").String(),
Entries: []logproto.Entry{
{Line: "log line 6", Timestamp: time.Unix(0, 1620000000000000006), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "9de325g124ad230b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "40e50221e284b9d2").String(),
Entries: []logproto.Entry{
{Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "40e50221e284b9d2")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "46ce02549441e41c").String(),
Entries: []logproto.Entry{
{Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "46ce02549441e41c")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
}
require.Equal(t, expected, result.Data.(logqlmodel.Streams))
})
t.Run("multiple records with different streams are accumulated correctly", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
},
nil,
)
// First record: prod and dev streams
rows1 := arrowtest.Rows{
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): "log line 1",
colEnv.FQN(): "prod",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
colMsg.FQN(): "log line 2",
colEnv.FQN(): "dev",
},
}
record1 := rows1.Record(memory.DefaultAllocator, schema)
defer record1.Release()
// Second record: prod and staging streams
rows2 := arrowtest.Rows{
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line 3",
colEnv.FQN(): "prod",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(),
colMsg.FQN(): "log line 4",
colEnv.FQN(): "staging",
},
}
record2 := rows2.Record(memory.DefaultAllocator, schema)
defer record2.Release()
builder := newStreamsResultBuilder(logproto.FORWARD, false)
// Collect first record
builder.CollectRecord(record1)
require.Equal(t, 2, builder.Len(), "should have 2 entries after first record")
// Collect second record
builder.CollectRecord(record2)
require.Equal(t, 4, builder.Len(), "should have 4 entries total after second record")
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
// Note: 3 unique streams (dev, prod, staging), but 4 total entries
// The prod stream has 2 entries (one from each record)
require.Equal(t, 3, len(streams), "should have 3 unique streams")
// Verify stream grouping - prod stream should have entries from both records
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "dev").String(),
Entries: []logproto.Entry{
{Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "prod").String(),
Entries: []logproto.Entry{
{Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
{Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "staging").String(),
Entries: []logproto.Entry{
{Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
}
require.Equal(t, expected, streams)
})
t.Run("buffer reuse with varying record sizes", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
},
nil,
)
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
// First record: 5 rows (buffer grows to 5)
rows1 := make(arrowtest.Rows, 5)
for i := 0; i < 5; i++ {
rows1[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "prod",
}
}
record1 := rows1.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record1)
record1.Release()
require.Equal(t, 5, builder.Len())
require.Equal(t, 5, len(builder.rowBuilders), "buffer should have 5 rowBuilders")
// Second record: 2 rows (buffer shrinks to 2)
rows2 := make(arrowtest.Rows, 2)
for i := 0; i < 2; i++ {
rows2[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "dev",
}
}
record2 := rows2.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record2)
record2.Release()
require.Equal(t, 7, builder.Len())
require.Equal(t, 2, len(builder.rowBuilders), "buffer should shrink to 2 rowBuilders")
// Third record: 10 rows (buffer grows to 10)
rows3 := make(arrowtest.Rows, 10)
for i := 0; i < 10; i++ {
rows3[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000020+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "staging",
}
}
record3 := rows3.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record3)
record3.Release()
require.Equal(t, 17, builder.Len())
require.Equal(t, 10, len(builder.rowBuilders), "buffer should grow to 10 rowBuilders")
// Verify all rowBuilders are properly initialized
for i := 0; i < len(builder.rowBuilders); i++ {
require.NotNil(t, builder.rowBuilders[i].lbsBuilder, "lbsBuilder should be initialized")
require.NotNil(t, builder.rowBuilders[i].metadataBuilder, "metadataBuilder should be initialized")
require.NotNil(t, builder.rowBuilders[i].parsedBuilder, "parsedBuilder should be initialized")
require.Equal(t, 0, len(builder.rowBuilders[i].parsedEmptyKeys), "parsedEmptyKeys should be empty")
}
})
t.Run("empty records mixed with valid records", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
},
nil,
)
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
// First record: 3 valid rows
rows1 := make(arrowtest.Rows, 3)
for i := 0; i < 3; i++ {
rows1[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "prod",
}
}
record1 := rows1.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record1)
record1.Release()
require.Equal(t, 3, builder.Len())
// Second record: empty (0 rows)
rows2 := arrowtest.Rows{}
record2 := rows2.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record2)
record2.Release()
require.Equal(t, 3, builder.Len(), "empty record should not change count")
// Third record: 2 valid rows
rows3 := make(arrowtest.Rows, 2)
for i := 0; i < 2; i++ {
rows3[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "dev",
}
}
record3 := rows3.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record3)
record3.Release()
require.Equal(t, 5, builder.Len(), "should have 5 total entries")
// Verify final result
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
// Note: 2 unique streams (prod with 3 entries, dev with 2 entries) = 5 total entries
require.Equal(t, 2, len(streams), "should have 2 unique streams")
// Verify the streams have the correct number of entries
var totalEntries int
for _, stream := range streams {
totalEntries += len(stream.Entries)
}
require.Equal(t, 5, totalEntries, "should have 5 total entries across both streams")
})
t.Run("parsed empty values are added to the parsed labels", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colMetadata := semconv.NewIdentifier("metadata", types.ColumnTypeMetadata, types.Loki.String)
colParsedA := semconv.NewIdentifier("Aparsed", types.ColumnTypeParsed, types.Loki.String)
colParsedZ := semconv.NewIdentifier("Zparsed", types.ColumnTypeParsed, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
semconv.FieldFromIdent(colMetadata, false),
semconv.FieldFromIdent(colParsedA, false),
semconv.FieldFromIdent(colParsedZ, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "md value", colParsedA.FQN(): "A", colParsedZ.FQN(): "Z"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "", colParsedA.FQN(): "", colParsedZ.FQN(): ""},
}
record := rows.Record(memory.DefaultAllocator, schema)
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
builder.CollectRecord(record)
record.Release()
require.Equal(t, 2, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
require.Equal(t, 2, len(streams), "should have 2 unique streams")
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("Aparsed", "", "env", "prod", "Zparsed", "").String(),
Entries: []logproto.Entry{
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("Aparsed", "", "Zparsed", ""))},
},
},
push.Stream{
Labels: labels.FromStrings("Aparsed", "A", "env", "prod", "metadata", "md value", "Zparsed", "Z").String(),
Entries: []logproto.Entry{
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("Aparsed", "A", "Zparsed", "Z"))},
},
},
}
require.Equal(t, expected, streams)
})
t.Run("labels with empty values are dropped from stream labels", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colRegion := semconv.NewIdentifier("region", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, true),
semconv.FieldFromIdent(colRegion, true),
},
nil,
)
rows := arrowtest.Rows{
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): "log line 1",
colEnv.FQN(): "prod",
colRegion.FQN(): "us-west",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
colMsg.FQN(): "log line 2",
colEnv.FQN(): "prod",
colRegion.FQN(): "",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line 3",
colEnv.FQN(): "",
colRegion.FQN(): "us-east",
},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newStreamsResultBuilder(logproto.FORWARD, false)
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 3, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
require.Equal(t, 3, len(streams), "should have 3 unique streams (empty label values dropped)")
streamLabels := make([]string, len(streams))
for i, s := range streams {
streamLabels[i] = s.Labels
}
require.Contains(t, streamLabels, labels.FromStrings("env", "prod", "region", "us-west").String())
require.Contains(t, streamLabels, labels.FromStrings("env", "prod").String())
require.Contains(t, streamLabels, labels.FromStrings("region", "us-east").String())
})
t.Run("categorize labels does not consider metadata or parsed keys when building output streams", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colMetadata := semconv.NewIdentifier("metadata", types.ColumnTypeMetadata, types.Loki.String)
colParsed := semconv.NewIdentifier("parsed", types.ColumnTypeParsed, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
semconv.FieldFromIdent(colMetadata, false),
semconv.FieldFromIdent(colParsed, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "a md value", colParsed.FQN(): "a parsed value"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "another md value", colParsed.FQN(): "another parsed value"},
}
record := rows.Record(memory.DefaultAllocator, schema)
builder := newStreamsResultBuilder(logproto.BACKWARD, true)
builder.CollectRecord(record)
record.Release()
require.Equal(t, 2, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
require.Equal(t, 1, len(streams), "should have 1 unique stream")
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "prod").String(),
Entries: []logproto.Entry{
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "a md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("parsed", "a parsed value"))},
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "another md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("parsed", "another parsed value"))},
},
},
}
require.Equal(t, expected, streams)
})
t.Run("duplicate entries from multiple records are deduplicated", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
},
nil,
)
ts1 := time.Unix(0, 1620000000000000001).UTC()
ts2 := time.Unix(0, 1620000000000000002).UTC()
ts3 := time.Unix(0, 1620000000000000003).UTC()
// Simulate two data object scans returning overlapping entries
// (same entry stored in multiple data objects).
rec1 := arrowtest.Rows{
{colTs.FQN(): ts1, colMsg.FQN(): "line A", colEnv.FQN(): "prod"},
{colTs.FQN(): ts2, colMsg.FQN(): "line B", colEnv.FQN(): "prod"},
{colTs.FQN(): ts3, colMsg.FQN(): "line C", colEnv.FQN(): "prod"},
}.Record(memory.DefaultAllocator, schema)
rec2 := arrowtest.Rows{
{colTs.FQN(): ts2, colMsg.FQN(): "line B", colEnv.FQN(): "prod"},
{colTs.FQN(): ts3, colMsg.FQN(): "line C", colEnv.FQN(): "prod"},
}.Record(memory.DefaultAllocator, schema)
builder := newStreamsResultBuilder(logproto.FORWARD, false)
builder.CollectRecord(rec1)
builder.CollectRecord(rec2)
rec1.Release()
rec2.Release()
require.Equal(t, 5, builder.Len(), "raw count before dedup")
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
require.Equal(t, 1, len(streams), "should have 1 unique stream")
require.Equal(t, 3, len(streams[0].Entries), "duplicates should be removed")
require.Equal(t, int64(3), result.Statistics.Summary.TotalEntriesReturned,
"stats should reflect post-dedup count")
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "prod").String(),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, ts1.UnixNano()), Line: "line A"},
{Timestamp: time.Unix(0, ts2.UnixNano()), Line: "line B"},
{Timestamp: time.Unix(0, ts3.UnixNano()), Line: "line C"},
},
},
}
require.Equal(t, expected, streams)
})
// Regression test: entries with different __error__ values must NOT be split into separate streams.
//
// BUG: When categorizeLabels=false, the new engine was incorrectly using __error__ and
// __error_details__ for stream GROUPING. This caused entries with different error messages
// to be placed in separate streams.
//
// Example: A query like `{app="foo"} | logfmt` parsing logs with invalid logfmt syntax
// would produce errors like "logfmt syntax error at pos 74" and "logfmt syntax error at pos 75".
// The old engine correctly keeps all entries in one stream, but the buggy new engine would
// create separate streams based on the different error details.
//
// The fix ensures error labels are excluded from the stream grouping key, matching classic
// Loki engine behavior. Error labels are still included in stream labels (when the first
// entry has errors) and in parsed labels (for all entries with errors).
t.Run("regression: error labels must not cause stream splitting", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colError := semconv.NewIdentifier(types.ColumnNameError, types.ColumnTypeGenerated, types.Loki.String)
colErrorDetails := semconv.NewIdentifier(types.ColumnNameErrorDetails, types.ColumnTypeGenerated, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
semconv.FieldFromIdent(colError, true),
semconv.FieldFromIdent(colErrorDetails, true),
},
nil,
)
rows := arrowtest.Rows{
// Entry 1: logfmt parse error at position 74 (first entry, sets stream labels)
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): "invalid {json broken at pos 74",
colEnv.FQN(): "prod",
colError.FQN(): "LogfmtParserErr",
colErrorDetails.FQN(): "logfmt syntax error at pos 74",
},
// Entry 2: logfmt parse error at position 75 (DIFFERENT error details)
// BUG: This entry was incorrectly placed in a separate stream due to different __error_details__
{
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
colMsg.FQN(): "invalid {json broken at pos 75",
colEnv.FQN(): "prod",
colError.FQN(): "LogfmtParserErr",
colErrorDetails.FQN(): "logfmt syntax error at pos 75",
},
// Entry 3: logfmt parse error at position 76 (yet another DIFFERENT error details)
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "invalid {json broken at pos 76",
colEnv.FQN(): "prod",
colError.FQN(): "LogfmtParserErr",
colErrorDetails.FQN(): "logfmt syntax error at pos 76",
},
}
record := rows.Record(memory.DefaultAllocator, schema)
defer record.Release()
// categorizeLabels=false is the default API behavior where parsed labels
// are merged into stream labels.
builder := newStreamsResultBuilder(logproto.FORWARD, false)
builder.CollectRecord(record)
require.Equal(t, 3, builder.Len(), "should have 3 entries")
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
// CRITICAL ASSERTION: All 3 entries with different error details must be in ONE stream.
// Before the fix, this would fail with len(streams) == 3 (each entry in its own stream
// due to different __error_details__ values being used for grouping).
require.Equal(t, 1, len(streams), "BUG: error labels caused stream splitting - expected 1 stream, got %d", len(streams))
require.Equal(t, 3, len(streams[0].Entries), "all 3 entries should be in the same stream")
// Verify error labels ARE present in stream labels (set by the first entry)
streamLabels := streams[0].Labels
require.Contains(t, streamLabels, types.ColumnNameError,
"stream labels should contain __error__ (from first entry)")
// Verify error labels ARE present in the parsed labels of each entry
// Helper to find a label in parsed labels
findParsedLabel := func(entry logproto.Entry, name string) (string, bool) {
for _, l := range entry.Parsed {
if l.Name == name {
return l.Value, true
}
}
return "", false
}
// All entries should have __error__ in parsed labels
for i, entry := range streams[0].Entries {
errVal, found := findParsedLabel(entry, types.ColumnNameError)
require.True(t, found, "entry %d should have __error__ in parsed labels", i)
require.Equal(t, "LogfmtParserErr", errVal)
// Each entry should have its own __error_details__ value
_, found = findParsedLabel(entry, types.ColumnNameErrorDetails)
require.True(t, found, "entry %d should have __error_details__ in parsed labels", i)
}
})
}
func TestVectorResultBuilder(t *testing.T) {
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newVectorResultBuilder()
md, _ := metadata.NewContext(t.Context())
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
})
t.Run("successful conversion of vector data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
semconv.FieldFromIdent(colJob, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(32), colInst.FQN(): nil, colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(15), colInst.FQN(): "localhost:9100", colJob.FQN(): "prometheus"},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newVectorResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 4, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
vector := result.Data.(promql.Vector)
require.Equal(t, 4, len(vector))
expected := promql.Vector{
{
T: int64(1620000000000),
F: 42.0,
Metric: labels.FromStrings("instance", "localhost:9090", "job", "prometheus"),
},
{
T: int64(1620000000000),
F: 23.0,
Metric: labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"),
},
{
T: int64(1620000000000),
F: 15.0,
Metric: labels.FromStrings("instance", "localhost:9100", "job", "prometheus"),
},
{
T: int64(1620000000000),
F: 32.0,
Metric: labels.FromStrings("job", "node-exporter"),
},
}
require.Equal(t, expected, vector)
})
t.Run("empty parsed label values are preserved in vector results", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colEnv := semconv.NewIdentifier("env", types.ColumnTypeParsed, types.Loki.String)
colScheme := semconv.NewIdentifier("http_url_scheme", types.ColumnTypeParsed, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colEnv, true),
semconv.FieldFromIdent(colScheme, true),
},
nil,
)
ts := time.Unix(0, 1620000000000000000).UTC()
rows := arrowtest.Rows{
// Non-empty parsed label value — no change in behaviour.
{colTs.FQN(): ts, colVal.FQN(): float64(1), colEnv.FQN(): "prod", colScheme.FQN(): "https"},
// Empty parsed label value (e.g. `| json` parsed http_url_scheme as "").
// Must appear in the result with value "" rather than being absent.
{colTs.FQN(): ts, colVal.FQN(): float64(2), colEnv.FQN(): "dev", colScheme.FQN(): ""},
// Absent parsed label (NULL — aggregator called AppendNull for this series).
// Must NOT appear in the result at all.
{colTs.FQN(): ts, colVal.FQN(): float64(3), colEnv.FQN(): "staging", colScheme.FQN(): nil},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newVectorResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 3, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
vector := result.Data.(promql.Vector)
samples := make(map[string]*promql.Sample, len(vector))
for i := range vector {
env := vector[i].Metric.Get("env")
samples[env] = &vector[i]
}
// prod: non-empty value, must be present as-is.
require.NotNil(t, samples["prod"], "expected a sample with env=prod")
require.Equal(t, "https", samples["prod"].Metric.Get("http_url_scheme"))
// dev: empty string value, must be present in the label set.
require.NotNil(t, samples["dev"], "expected a sample with env=dev")
devHasScheme := false
samples["dev"].Metric.Range(func(l labels.Label) {
if l.Name == "http_url_scheme" {
devHasScheme = true
require.Equal(t, "", l.Value)
}
})
require.True(t, devHasScheme, "dev: http_url_scheme with empty value must be present in the label set")
// staging: absent label (NULL), must not appear at all.
require.NotNil(t, samples["staging"], "expected a sample with env=staging")
stagingHasScheme := false
samples["staging"].Metric.Range(func(l labels.Label) {
if l.Name == "http_url_scheme" {
stagingHasScheme = true
}
})
require.False(t, stagingHasScheme, "staging: absent label (NULL) must not appear in the label set")
})
// TODO:(ashwanth) also enforce grouping labels are all present?
t.Run("rows without timestamp or value are ignored", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): nil, colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): nil, colInst.FQN(): "localhost:9100"},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newVectorResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.Len(), "expected no samples to be collected")
})
}
func TestMatrixResultBuilder(t *testing.T) {
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newMatrixResultBuilder()
md, _ := metadata.NewContext(t.Context())
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
})
t.Run("successful conversion of matrix data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
semconv.FieldFromIdent(colJob, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(43), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(44), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(24), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(25), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newMatrixResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 6, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
matrix := result.Data.(promql.Matrix)
require.Equal(t, 2, len(matrix))
expected := promql.Matrix{
{
Metric: labels.FromStrings("instance", "localhost:9090", "job", "prometheus"),
Floats: []promql.FPoint{
{T: int64(1620000000000), F: 42.0},
{T: int64(1620000001000), F: 43.0},
{T: int64(1620000002000), F: 44.0},
},
},
{
Metric: labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"),
Floats: []promql.FPoint{
{T: int64(1620000000000), F: 23.0},
{T: int64(1620000001000), F: 24.0},
{T: int64(1620000002000), F: 25.0},
},
},
}
require.Equal(t, expected, matrix)
})
}