mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
995 lines
38 KiB
995 lines
38 KiB
package engine
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/promql"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
|
|
"github.com/grafana/loki/v3/pkg/util/arrowtest"
|
|
|
|
"github.com/grafana/loki/pkg/push"
|
|
)
|
|
|
|
func TestStreamsResultBuilder(t *testing.T) {
|
|
t.Run("empty builder returns non-nil result", func(t *testing.T) {
|
|
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
|
|
md, _ := metadata.NewContext(t.Context())
|
|
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
|
|
})
|
|
|
|
t.Run("rows without timestamp, or labels are ignored", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeMetadata, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, true), // in practice timestamp is not nullable!
|
|
semconv.FieldFromIdent(colMsg, true), // in practice message is not nullable!
|
|
semconv.FieldFromIdent(colEnv, true),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
|
|
colMsg.FQN(): nil,
|
|
colEnv.FQN(): "prod",
|
|
},
|
|
{
|
|
colTs.FQN(): nil,
|
|
colMsg.FQN(): "log line",
|
|
colEnv.FQN(): "prod",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
|
|
colMsg.FQN(): "log line",
|
|
colEnv.FQN(): nil,
|
|
},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
|
|
pipeline := executor.NewBufferedPipeline(record)
|
|
defer pipeline.Close()
|
|
|
|
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
|
|
err := collectResult(context.Background(), pipeline, builder)
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, builder.Len(), "expected 1 entry to be collected")
|
|
})
|
|
|
|
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
colNs := semconv.NewIdentifier("namespace", types.ColumnTypeLabel, types.Loki.String)
|
|
colTid := semconv.NewIdentifier("traceID", types.ColumnTypeMetadata, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, true),
|
|
semconv.FieldFromIdent(colNs, true),
|
|
semconv.FieldFromIdent(colTid, true),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{
|
|
colTs.FQN(): nil,
|
|
colMsg.FQN(): "log line 0 (must be skipped)",
|
|
colEnv.FQN(): "dev",
|
|
colNs.FQN(): "loki-dev-001",
|
|
colTid.FQN(): "860e403fcf754312",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
|
|
colMsg.FQN(): "log line 1",
|
|
colEnv.FQN(): "dev",
|
|
colNs.FQN(): "loki-dev-001",
|
|
colTid.FQN(): "860e403fcf754312",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
|
|
colMsg.FQN(): "log line 2",
|
|
colEnv.FQN(): "prod",
|
|
colNs.FQN(): "loki-prod-001",
|
|
colTid.FQN(): "46ce02549441e41c",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
|
|
colMsg.FQN(): "log line 3",
|
|
colEnv.FQN(): "dev",
|
|
colNs.FQN(): "loki-dev-002",
|
|
colTid.FQN(): "61330481e1e59b18",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(),
|
|
colMsg.FQN(): "log line 4",
|
|
colEnv.FQN(): "prod",
|
|
colNs.FQN(): "loki-prod-001",
|
|
colTid.FQN(): "40e50221e284b9d2",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000005).UTC(),
|
|
colMsg.FQN(): "log line 5",
|
|
colEnv.FQN(): "dev",
|
|
colNs.FQN(): "loki-dev-002",
|
|
colTid.FQN(): "0cf883f112ad239b",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000006).UTC(),
|
|
colMsg.FQN(): "log line 6",
|
|
colEnv.FQN(): "dev",
|
|
colNs.FQN(): nil,
|
|
colTid.FQN(): "9de325g124ad230b",
|
|
},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
|
|
pipeline := executor.NewBufferedPipeline(record)
|
|
defer pipeline.Close()
|
|
|
|
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
|
|
err := collectResult(context.Background(), pipeline, builder)
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, 6, builder.Len())
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
require.Equal(t, 6, result.Data.(logqlmodel.Streams).Len())
|
|
|
|
expected := logqlmodel.Streams{
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-001", "traceID", "860e403fcf754312").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "860e403fcf754312")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-002", "traceID", "0cf883f112ad239b").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 5", Timestamp: time.Unix(0, 1620000000000000005), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "0cf883f112ad239b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-002", "traceID", "61330481e1e59b18").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "61330481e1e59b18")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "dev", "traceID", "9de325g124ad230b").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 6", Timestamp: time.Unix(0, 1620000000000000006), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "9de325g124ad230b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "40e50221e284b9d2").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "40e50221e284b9d2")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "46ce02549441e41c").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "46ce02549441e41c")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
}
|
|
require.Equal(t, expected, result.Data.(logqlmodel.Streams))
|
|
})
|
|
|
|
t.Run("multiple records with different streams are accumulated correctly", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, false),
|
|
},
|
|
nil,
|
|
)
|
|
|
|
// First record: prod and dev streams
|
|
rows1 := arrowtest.Rows{
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
|
|
colMsg.FQN(): "log line 1",
|
|
colEnv.FQN(): "prod",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
|
|
colMsg.FQN(): "log line 2",
|
|
colEnv.FQN(): "dev",
|
|
},
|
|
}
|
|
record1 := rows1.Record(memory.DefaultAllocator, schema)
|
|
defer record1.Release()
|
|
|
|
// Second record: prod and staging streams
|
|
rows2 := arrowtest.Rows{
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
|
|
colMsg.FQN(): "log line 3",
|
|
colEnv.FQN(): "prod",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(),
|
|
colMsg.FQN(): "log line 4",
|
|
colEnv.FQN(): "staging",
|
|
},
|
|
}
|
|
record2 := rows2.Record(memory.DefaultAllocator, schema)
|
|
defer record2.Release()
|
|
|
|
builder := newStreamsResultBuilder(logproto.FORWARD, false)
|
|
|
|
// Collect first record
|
|
builder.CollectRecord(record1)
|
|
require.Equal(t, 2, builder.Len(), "should have 2 entries after first record")
|
|
|
|
// Collect second record
|
|
builder.CollectRecord(record2)
|
|
require.Equal(t, 4, builder.Len(), "should have 4 entries total after second record")
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
streams := result.Data.(logqlmodel.Streams)
|
|
// Note: 3 unique streams (dev, prod, staging), but 4 total entries
|
|
// The prod stream has 2 entries (one from each record)
|
|
require.Equal(t, 3, len(streams), "should have 3 unique streams")
|
|
|
|
// Verify stream grouping - prod stream should have entries from both records
|
|
expected := logqlmodel.Streams{
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "dev").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "prod").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
{Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "staging").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
|
|
},
|
|
},
|
|
}
|
|
require.Equal(t, expected, streams)
|
|
})
|
|
|
|
t.Run("buffer reuse with varying record sizes", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, false),
|
|
},
|
|
nil,
|
|
)
|
|
|
|
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
|
|
|
|
// First record: 5 rows (buffer grows to 5)
|
|
rows1 := make(arrowtest.Rows, 5)
|
|
for i := 0; i < 5; i++ {
|
|
rows1[i] = arrowtest.Row{
|
|
colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(),
|
|
colMsg.FQN(): "log line",
|
|
colEnv.FQN(): "prod",
|
|
}
|
|
}
|
|
record1 := rows1.Record(memory.DefaultAllocator, schema)
|
|
builder.CollectRecord(record1)
|
|
record1.Release()
|
|
require.Equal(t, 5, builder.Len())
|
|
require.Equal(t, 5, len(builder.rowBuilders), "buffer should have 5 rowBuilders")
|
|
|
|
// Second record: 2 rows (buffer shrinks to 2)
|
|
rows2 := make(arrowtest.Rows, 2)
|
|
for i := 0; i < 2; i++ {
|
|
rows2[i] = arrowtest.Row{
|
|
colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(),
|
|
colMsg.FQN(): "log line",
|
|
colEnv.FQN(): "dev",
|
|
}
|
|
}
|
|
record2 := rows2.Record(memory.DefaultAllocator, schema)
|
|
builder.CollectRecord(record2)
|
|
record2.Release()
|
|
require.Equal(t, 7, builder.Len())
|
|
require.Equal(t, 2, len(builder.rowBuilders), "buffer should shrink to 2 rowBuilders")
|
|
|
|
// Third record: 10 rows (buffer grows to 10)
|
|
rows3 := make(arrowtest.Rows, 10)
|
|
for i := 0; i < 10; i++ {
|
|
rows3[i] = arrowtest.Row{
|
|
colTs.FQN(): time.Unix(0, int64(1620000000000000020+i)).UTC(),
|
|
colMsg.FQN(): "log line",
|
|
colEnv.FQN(): "staging",
|
|
}
|
|
}
|
|
record3 := rows3.Record(memory.DefaultAllocator, schema)
|
|
builder.CollectRecord(record3)
|
|
record3.Release()
|
|
require.Equal(t, 17, builder.Len())
|
|
require.Equal(t, 10, len(builder.rowBuilders), "buffer should grow to 10 rowBuilders")
|
|
|
|
// Verify all rowBuilders are properly initialized
|
|
for i := 0; i < len(builder.rowBuilders); i++ {
|
|
require.NotNil(t, builder.rowBuilders[i].lbsBuilder, "lbsBuilder should be initialized")
|
|
require.NotNil(t, builder.rowBuilders[i].metadataBuilder, "metadataBuilder should be initialized")
|
|
require.NotNil(t, builder.rowBuilders[i].parsedBuilder, "parsedBuilder should be initialized")
|
|
require.Equal(t, 0, len(builder.rowBuilders[i].parsedEmptyKeys), "parsedEmptyKeys should be empty")
|
|
}
|
|
})
|
|
|
|
t.Run("empty records mixed with valid records", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, false),
|
|
},
|
|
nil,
|
|
)
|
|
|
|
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
|
|
|
|
// First record: 3 valid rows
|
|
rows1 := make(arrowtest.Rows, 3)
|
|
for i := 0; i < 3; i++ {
|
|
rows1[i] = arrowtest.Row{
|
|
colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(),
|
|
colMsg.FQN(): "log line",
|
|
colEnv.FQN(): "prod",
|
|
}
|
|
}
|
|
record1 := rows1.Record(memory.DefaultAllocator, schema)
|
|
builder.CollectRecord(record1)
|
|
record1.Release()
|
|
require.Equal(t, 3, builder.Len())
|
|
|
|
// Second record: empty (0 rows)
|
|
rows2 := arrowtest.Rows{}
|
|
record2 := rows2.Record(memory.DefaultAllocator, schema)
|
|
builder.CollectRecord(record2)
|
|
record2.Release()
|
|
require.Equal(t, 3, builder.Len(), "empty record should not change count")
|
|
|
|
// Third record: 2 valid rows
|
|
rows3 := make(arrowtest.Rows, 2)
|
|
for i := 0; i < 2; i++ {
|
|
rows3[i] = arrowtest.Row{
|
|
colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(),
|
|
colMsg.FQN(): "log line",
|
|
colEnv.FQN(): "dev",
|
|
}
|
|
}
|
|
record3 := rows3.Record(memory.DefaultAllocator, schema)
|
|
builder.CollectRecord(record3)
|
|
record3.Release()
|
|
require.Equal(t, 5, builder.Len(), "should have 5 total entries")
|
|
|
|
// Verify final result
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
streams := result.Data.(logqlmodel.Streams)
|
|
// Note: 2 unique streams (prod with 3 entries, dev with 2 entries) = 5 total entries
|
|
require.Equal(t, 2, len(streams), "should have 2 unique streams")
|
|
|
|
// Verify the streams have the correct number of entries
|
|
var totalEntries int
|
|
for _, stream := range streams {
|
|
totalEntries += len(stream.Entries)
|
|
}
|
|
require.Equal(t, 5, totalEntries, "should have 5 total entries across both streams")
|
|
})
|
|
|
|
t.Run("parsed empty values are added to the parsed labels", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
colMetadata := semconv.NewIdentifier("metadata", types.ColumnTypeMetadata, types.Loki.String)
|
|
colParsedA := semconv.NewIdentifier("Aparsed", types.ColumnTypeParsed, types.Loki.String)
|
|
colParsedZ := semconv.NewIdentifier("Zparsed", types.ColumnTypeParsed, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, false),
|
|
semconv.FieldFromIdent(colMetadata, false),
|
|
semconv.FieldFromIdent(colParsedA, false),
|
|
semconv.FieldFromIdent(colParsedZ, false),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "md value", colParsedA.FQN(): "A", colParsedZ.FQN(): "Z"},
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "", colParsedA.FQN(): "", colParsedZ.FQN(): ""},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
|
|
builder.CollectRecord(record)
|
|
record.Release()
|
|
require.Equal(t, 2, builder.Len())
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
streams := result.Data.(logqlmodel.Streams)
|
|
require.Equal(t, 2, len(streams), "should have 2 unique streams")
|
|
|
|
expected := logqlmodel.Streams{
|
|
push.Stream{
|
|
Labels: labels.FromStrings("Aparsed", "", "env", "prod", "Zparsed", "").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("Aparsed", "", "Zparsed", ""))},
|
|
},
|
|
},
|
|
push.Stream{
|
|
Labels: labels.FromStrings("Aparsed", "A", "env", "prod", "metadata", "md value", "Zparsed", "Z").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("Aparsed", "A", "Zparsed", "Z"))},
|
|
},
|
|
},
|
|
}
|
|
require.Equal(t, expected, streams)
|
|
})
|
|
t.Run("labels with empty values are dropped from stream labels", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
colRegion := semconv.NewIdentifier("region", types.ColumnTypeLabel, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, true),
|
|
semconv.FieldFromIdent(colRegion, true),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
|
|
colMsg.FQN(): "log line 1",
|
|
colEnv.FQN(): "prod",
|
|
colRegion.FQN(): "us-west",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
|
|
colMsg.FQN(): "log line 2",
|
|
colEnv.FQN(): "prod",
|
|
colRegion.FQN(): "",
|
|
},
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
|
|
colMsg.FQN(): "log line 3",
|
|
colEnv.FQN(): "",
|
|
colRegion.FQN(): "us-east",
|
|
},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
|
|
pipeline := executor.NewBufferedPipeline(record)
|
|
defer pipeline.Close()
|
|
|
|
builder := newStreamsResultBuilder(logproto.FORWARD, false)
|
|
err := collectResult(context.Background(), pipeline, builder)
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, builder.Len())
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
streams := result.Data.(logqlmodel.Streams)
|
|
|
|
require.Equal(t, 3, len(streams), "should have 3 unique streams (empty label values dropped)")
|
|
|
|
streamLabels := make([]string, len(streams))
|
|
for i, s := range streams {
|
|
streamLabels[i] = s.Labels
|
|
}
|
|
require.Contains(t, streamLabels, labels.FromStrings("env", "prod", "region", "us-west").String())
|
|
require.Contains(t, streamLabels, labels.FromStrings("env", "prod").String())
|
|
require.Contains(t, streamLabels, labels.FromStrings("region", "us-east").String())
|
|
})
|
|
|
|
t.Run("categorize labels does not consider metadata or parsed keys when building output streams", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
colMetadata := semconv.NewIdentifier("metadata", types.ColumnTypeMetadata, types.Loki.String)
|
|
colParsed := semconv.NewIdentifier("parsed", types.ColumnTypeParsed, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, false),
|
|
semconv.FieldFromIdent(colMetadata, false),
|
|
semconv.FieldFromIdent(colParsed, false),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "a md value", colParsed.FQN(): "a parsed value"},
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "another md value", colParsed.FQN(): "another parsed value"},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
builder := newStreamsResultBuilder(logproto.BACKWARD, true)
|
|
builder.CollectRecord(record)
|
|
record.Release()
|
|
require.Equal(t, 2, builder.Len())
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
streams := result.Data.(logqlmodel.Streams)
|
|
require.Equal(t, 1, len(streams), "should have 1 unique stream")
|
|
|
|
expected := logqlmodel.Streams{
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "prod").String(),
|
|
Entries: []logproto.Entry{
|
|
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "a md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("parsed", "a parsed value"))},
|
|
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "another md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("parsed", "another parsed value"))},
|
|
},
|
|
},
|
|
}
|
|
require.Equal(t, expected, streams)
|
|
})
|
|
|
|
t.Run("duplicate entries from multiple records are deduplicated", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, false),
|
|
},
|
|
nil,
|
|
)
|
|
|
|
ts1 := time.Unix(0, 1620000000000000001).UTC()
|
|
ts2 := time.Unix(0, 1620000000000000002).UTC()
|
|
ts3 := time.Unix(0, 1620000000000000003).UTC()
|
|
|
|
// Simulate two data object scans returning overlapping entries
|
|
// (same entry stored in multiple data objects).
|
|
rec1 := arrowtest.Rows{
|
|
{colTs.FQN(): ts1, colMsg.FQN(): "line A", colEnv.FQN(): "prod"},
|
|
{colTs.FQN(): ts2, colMsg.FQN(): "line B", colEnv.FQN(): "prod"},
|
|
{colTs.FQN(): ts3, colMsg.FQN(): "line C", colEnv.FQN(): "prod"},
|
|
}.Record(memory.DefaultAllocator, schema)
|
|
|
|
rec2 := arrowtest.Rows{
|
|
{colTs.FQN(): ts2, colMsg.FQN(): "line B", colEnv.FQN(): "prod"},
|
|
{colTs.FQN(): ts3, colMsg.FQN(): "line C", colEnv.FQN(): "prod"},
|
|
}.Record(memory.DefaultAllocator, schema)
|
|
|
|
builder := newStreamsResultBuilder(logproto.FORWARD, false)
|
|
builder.CollectRecord(rec1)
|
|
builder.CollectRecord(rec2)
|
|
rec1.Release()
|
|
rec2.Release()
|
|
|
|
require.Equal(t, 5, builder.Len(), "raw count before dedup")
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
streams := result.Data.(logqlmodel.Streams)
|
|
require.Equal(t, 1, len(streams), "should have 1 unique stream")
|
|
require.Equal(t, 3, len(streams[0].Entries), "duplicates should be removed")
|
|
require.Equal(t, int64(3), result.Statistics.Summary.TotalEntriesReturned,
|
|
"stats should reflect post-dedup count")
|
|
|
|
expected := logqlmodel.Streams{
|
|
push.Stream{
|
|
Labels: labels.FromStrings("env", "prod").String(),
|
|
Entries: []logproto.Entry{
|
|
{Timestamp: time.Unix(0, ts1.UnixNano()), Line: "line A"},
|
|
{Timestamp: time.Unix(0, ts2.UnixNano()), Line: "line B"},
|
|
{Timestamp: time.Unix(0, ts3.UnixNano()), Line: "line C"},
|
|
},
|
|
},
|
|
}
|
|
require.Equal(t, expected, streams)
|
|
})
|
|
|
|
// Regression test: entries with different __error__ values must NOT be split into separate streams.
|
|
//
|
|
// BUG: When categorizeLabels=false, the new engine was incorrectly using __error__ and
|
|
// __error_details__ for stream GROUPING. This caused entries with different error messages
|
|
// to be placed in separate streams.
|
|
//
|
|
// Example: A query like `{app="foo"} | logfmt` parsing logs with invalid logfmt syntax
|
|
// would produce errors like "logfmt syntax error at pos 74" and "logfmt syntax error at pos 75".
|
|
// The old engine correctly keeps all entries in one stream, but the buggy new engine would
|
|
// create separate streams based on the different error details.
|
|
//
|
|
// The fix ensures error labels are excluded from the stream grouping key, matching classic
|
|
// Loki engine behavior. Error labels are still included in stream labels (when the first
|
|
// entry has errors) and in parsed labels (for all entries with errors).
|
|
t.Run("regression: error labels must not cause stream splitting", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colMsg := semconv.ColumnIdentMessage
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
|
|
colError := semconv.NewIdentifier(types.ColumnNameError, types.ColumnTypeGenerated, types.Loki.String)
|
|
colErrorDetails := semconv.NewIdentifier(types.ColumnNameErrorDetails, types.ColumnTypeGenerated, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colMsg, false),
|
|
semconv.FieldFromIdent(colEnv, false),
|
|
semconv.FieldFromIdent(colError, true),
|
|
semconv.FieldFromIdent(colErrorDetails, true),
|
|
},
|
|
nil,
|
|
)
|
|
|
|
rows := arrowtest.Rows{
|
|
// Entry 1: logfmt parse error at position 74 (first entry, sets stream labels)
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
|
|
colMsg.FQN(): "invalid {json broken at pos 74",
|
|
colEnv.FQN(): "prod",
|
|
colError.FQN(): "LogfmtParserErr",
|
|
colErrorDetails.FQN(): "logfmt syntax error at pos 74",
|
|
},
|
|
// Entry 2: logfmt parse error at position 75 (DIFFERENT error details)
|
|
// BUG: This entry was incorrectly placed in a separate stream due to different __error_details__
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
|
|
colMsg.FQN(): "invalid {json broken at pos 75",
|
|
colEnv.FQN(): "prod",
|
|
colError.FQN(): "LogfmtParserErr",
|
|
colErrorDetails.FQN(): "logfmt syntax error at pos 75",
|
|
},
|
|
// Entry 3: logfmt parse error at position 76 (yet another DIFFERENT error details)
|
|
{
|
|
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
|
|
colMsg.FQN(): "invalid {json broken at pos 76",
|
|
colEnv.FQN(): "prod",
|
|
colError.FQN(): "LogfmtParserErr",
|
|
colErrorDetails.FQN(): "logfmt syntax error at pos 76",
|
|
},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
defer record.Release()
|
|
|
|
// categorizeLabels=false is the default API behavior where parsed labels
|
|
// are merged into stream labels.
|
|
builder := newStreamsResultBuilder(logproto.FORWARD, false)
|
|
builder.CollectRecord(record)
|
|
|
|
require.Equal(t, 3, builder.Len(), "should have 3 entries")
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
streams := result.Data.(logqlmodel.Streams)
|
|
|
|
// CRITICAL ASSERTION: All 3 entries with different error details must be in ONE stream.
|
|
// Before the fix, this would fail with len(streams) == 3 (each entry in its own stream
|
|
// due to different __error_details__ values being used for grouping).
|
|
require.Equal(t, 1, len(streams), "BUG: error labels caused stream splitting - expected 1 stream, got %d", len(streams))
|
|
require.Equal(t, 3, len(streams[0].Entries), "all 3 entries should be in the same stream")
|
|
|
|
// Verify error labels ARE present in stream labels (set by the first entry)
|
|
streamLabels := streams[0].Labels
|
|
require.Contains(t, streamLabels, types.ColumnNameError,
|
|
"stream labels should contain __error__ (from first entry)")
|
|
|
|
// Verify error labels ARE present in the parsed labels of each entry
|
|
// Helper to find a label in parsed labels
|
|
findParsedLabel := func(entry logproto.Entry, name string) (string, bool) {
|
|
for _, l := range entry.Parsed {
|
|
if l.Name == name {
|
|
return l.Value, true
|
|
}
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
// All entries should have __error__ in parsed labels
|
|
for i, entry := range streams[0].Entries {
|
|
errVal, found := findParsedLabel(entry, types.ColumnNameError)
|
|
require.True(t, found, "entry %d should have __error__ in parsed labels", i)
|
|
require.Equal(t, "LogfmtParserErr", errVal)
|
|
|
|
// Each entry should have its own __error_details__ value
|
|
_, found = findParsedLabel(entry, types.ColumnNameErrorDetails)
|
|
require.True(t, found, "entry %d should have __error_details__ in parsed labels", i)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestVectorResultBuilder(t *testing.T) {
|
|
t.Run("empty builder returns non-nil result", func(t *testing.T) {
|
|
builder := newVectorResultBuilder()
|
|
md, _ := metadata.NewContext(t.Context())
|
|
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
|
|
})
|
|
|
|
t.Run("successful conversion of vector data", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colVal := semconv.ColumnIdentValue
|
|
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
|
|
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colVal, false),
|
|
semconv.FieldFromIdent(colInst, false),
|
|
semconv.FieldFromIdent(colJob, false),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(32), colInst.FQN(): nil, colJob.FQN(): "node-exporter"},
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(15), colInst.FQN(): "localhost:9100", colJob.FQN(): "prometheus"},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
|
|
pipeline := executor.NewBufferedPipeline(record)
|
|
defer pipeline.Close()
|
|
|
|
builder := newVectorResultBuilder()
|
|
err := collectResult(context.Background(), pipeline, builder)
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, 4, builder.Len())
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
vector := result.Data.(promql.Vector)
|
|
require.Equal(t, 4, len(vector))
|
|
|
|
expected := promql.Vector{
|
|
{
|
|
T: int64(1620000000000),
|
|
F: 42.0,
|
|
Metric: labels.FromStrings("instance", "localhost:9090", "job", "prometheus"),
|
|
},
|
|
{
|
|
T: int64(1620000000000),
|
|
F: 23.0,
|
|
Metric: labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"),
|
|
},
|
|
{
|
|
T: int64(1620000000000),
|
|
F: 15.0,
|
|
Metric: labels.FromStrings("instance", "localhost:9100", "job", "prometheus"),
|
|
},
|
|
{
|
|
T: int64(1620000000000),
|
|
F: 32.0,
|
|
Metric: labels.FromStrings("job", "node-exporter"),
|
|
},
|
|
}
|
|
require.Equal(t, expected, vector)
|
|
})
|
|
|
|
t.Run("empty parsed label values are preserved in vector results", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colVal := semconv.ColumnIdentValue
|
|
colEnv := semconv.NewIdentifier("env", types.ColumnTypeParsed, types.Loki.String)
|
|
colScheme := semconv.NewIdentifier("http_url_scheme", types.ColumnTypeParsed, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colVal, false),
|
|
semconv.FieldFromIdent(colEnv, true),
|
|
semconv.FieldFromIdent(colScheme, true),
|
|
},
|
|
nil,
|
|
)
|
|
ts := time.Unix(0, 1620000000000000000).UTC()
|
|
rows := arrowtest.Rows{
|
|
// Non-empty parsed label value — no change in behaviour.
|
|
{colTs.FQN(): ts, colVal.FQN(): float64(1), colEnv.FQN(): "prod", colScheme.FQN(): "https"},
|
|
// Empty parsed label value (e.g. `| json` parsed http_url_scheme as "").
|
|
// Must appear in the result with value "" rather than being absent.
|
|
{colTs.FQN(): ts, colVal.FQN(): float64(2), colEnv.FQN(): "dev", colScheme.FQN(): ""},
|
|
// Absent parsed label (NULL — aggregator called AppendNull for this series).
|
|
// Must NOT appear in the result at all.
|
|
{colTs.FQN(): ts, colVal.FQN(): float64(3), colEnv.FQN(): "staging", colScheme.FQN(): nil},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
|
|
pipeline := executor.NewBufferedPipeline(record)
|
|
defer pipeline.Close()
|
|
|
|
builder := newVectorResultBuilder()
|
|
err := collectResult(context.Background(), pipeline, builder)
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, builder.Len())
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
vector := result.Data.(promql.Vector)
|
|
|
|
samples := make(map[string]*promql.Sample, len(vector))
|
|
for i := range vector {
|
|
env := vector[i].Metric.Get("env")
|
|
samples[env] = &vector[i]
|
|
}
|
|
|
|
// prod: non-empty value, must be present as-is.
|
|
require.NotNil(t, samples["prod"], "expected a sample with env=prod")
|
|
require.Equal(t, "https", samples["prod"].Metric.Get("http_url_scheme"))
|
|
|
|
// dev: empty string value, must be present in the label set.
|
|
require.NotNil(t, samples["dev"], "expected a sample with env=dev")
|
|
devHasScheme := false
|
|
samples["dev"].Metric.Range(func(l labels.Label) {
|
|
if l.Name == "http_url_scheme" {
|
|
devHasScheme = true
|
|
require.Equal(t, "", l.Value)
|
|
}
|
|
})
|
|
require.True(t, devHasScheme, "dev: http_url_scheme with empty value must be present in the label set")
|
|
|
|
// staging: absent label (NULL), must not appear at all.
|
|
require.NotNil(t, samples["staging"], "expected a sample with env=staging")
|
|
stagingHasScheme := false
|
|
samples["staging"].Metric.Range(func(l labels.Label) {
|
|
if l.Name == "http_url_scheme" {
|
|
stagingHasScheme = true
|
|
}
|
|
})
|
|
require.False(t, stagingHasScheme, "staging: absent label (NULL) must not appear in the label set")
|
|
})
|
|
|
|
// TODO:(ashwanth) also enforce grouping labels are all present?
|
|
t.Run("rows without timestamp or value are ignored", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colVal := semconv.ColumnIdentValue
|
|
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colVal, false),
|
|
semconv.FieldFromIdent(colInst, false),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{colTs.FQN(): nil, colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090"},
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): nil, colInst.FQN(): "localhost:9100"},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
|
|
pipeline := executor.NewBufferedPipeline(record)
|
|
defer pipeline.Close()
|
|
|
|
builder := newVectorResultBuilder()
|
|
err := collectResult(context.Background(), pipeline, builder)
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, 0, builder.Len(), "expected no samples to be collected")
|
|
})
|
|
}
|
|
|
|
func TestMatrixResultBuilder(t *testing.T) {
|
|
t.Run("empty builder returns non-nil result", func(t *testing.T) {
|
|
builder := newMatrixResultBuilder()
|
|
md, _ := metadata.NewContext(t.Context())
|
|
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
|
|
})
|
|
|
|
t.Run("successful conversion of matrix data", func(t *testing.T) {
|
|
colTs := semconv.ColumnIdentTimestamp
|
|
colVal := semconv.ColumnIdentValue
|
|
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
|
|
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
|
|
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
semconv.FieldFromIdent(colTs, false),
|
|
semconv.FieldFromIdent(colVal, false),
|
|
semconv.FieldFromIdent(colInst, false),
|
|
semconv.FieldFromIdent(colJob, false),
|
|
},
|
|
nil,
|
|
)
|
|
rows := arrowtest.Rows{
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
|
|
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(43), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
|
|
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(44), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
|
|
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
|
|
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(24), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
|
|
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(25), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
|
|
}
|
|
|
|
record := rows.Record(memory.DefaultAllocator, schema)
|
|
|
|
pipeline := executor.NewBufferedPipeline(record)
|
|
defer pipeline.Close()
|
|
|
|
builder := newMatrixResultBuilder()
|
|
err := collectResult(context.Background(), pipeline, builder)
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, 6, builder.Len())
|
|
|
|
md, _ := metadata.NewContext(t.Context())
|
|
result := builder.Build(stats.Result{}, md)
|
|
matrix := result.Data.(promql.Matrix)
|
|
require.Equal(t, 2, len(matrix))
|
|
|
|
expected := promql.Matrix{
|
|
{
|
|
Metric: labels.FromStrings("instance", "localhost:9090", "job", "prometheus"),
|
|
Floats: []promql.FPoint{
|
|
{T: int64(1620000000000), F: 42.0},
|
|
{T: int64(1620000001000), F: 43.0},
|
|
{T: int64(1620000002000), F: 44.0},
|
|
},
|
|
},
|
|
{
|
|
Metric: labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"),
|
|
Floats: []promql.FPoint{
|
|
{T: int64(1620000000000), F: 23.0},
|
|
{T: int64(1620000001000), F: 24.0},
|
|
{T: int64(1620000002000), F: 25.0},
|
|
},
|
|
},
|
|
}
|
|
require.Equal(t, expected, matrix)
|
|
})
|
|
}
|
|
|