Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/compat_test.go

689 lines
26 KiB

package engine
import (
"context"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
"github.com/grafana/loki/pkg/push"
)
func TestStreamsResultBuilder(t *testing.T) {
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
md, _ := metadata.NewContext(t.Context())
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
})
t.Run("rows without log line, timestamp, or labels are ignored", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, true), // in practice timestamp is not nullable!
semconv.FieldFromIdent(colMsg, true), // in practice message is not nullable!
semconv.FieldFromIdent(colEnv, true),
},
nil,
)
rows := arrowtest.Rows{
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): nil,
colEnv.FQN(): "prod",
},
{
colTs.FQN(): nil,
colMsg.FQN(): "log line",
colEnv.FQN(): "prod",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): nil,
},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.Len(), "expected no entries to be collected")
})
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colNs := semconv.NewIdentifier("namespace", types.ColumnTypeLabel, types.Loki.String)
colTid := semconv.NewIdentifier("traceID", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, true),
semconv.FieldFromIdent(colNs, true),
semconv.FieldFromIdent(colTid, true),
},
nil,
)
rows := arrowtest.Rows{
{
colTs.FQN(): nil,
colMsg.FQN(): "log line 0 (must be skipped)",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-001",
colTid.FQN(): "860e403fcf754312",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): "log line 1",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-001",
colTid.FQN(): "860e403fcf754312",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
colMsg.FQN(): "log line 2",
colEnv.FQN(): "prod",
colNs.FQN(): "loki-prod-001",
colTid.FQN(): "46ce02549441e41c",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line 3",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-002",
colTid.FQN(): "61330481e1e59b18",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(),
colMsg.FQN(): "log line 4",
colEnv.FQN(): "prod",
colNs.FQN(): "loki-prod-001",
colTid.FQN(): "40e50221e284b9d2",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000005).UTC(),
colMsg.FQN(): "log line 5",
colEnv.FQN(): "dev",
colNs.FQN(): "loki-dev-002",
colTid.FQN(): "0cf883f112ad239b",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000006).UTC(),
colMsg.FQN(): "log line 6",
colEnv.FQN(): "dev",
colNs.FQN(): nil,
colTid.FQN(): "9de325g124ad230b",
},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 6, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
require.Equal(t, 6, result.Data.(logqlmodel.Streams).Len())
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-001", "traceID", "860e403fcf754312").String(),
Entries: []logproto.Entry{
{Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "860e403fcf754312")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-002", "traceID", "0cf883f112ad239b").String(),
Entries: []logproto.Entry{
{Line: "log line 5", Timestamp: time.Unix(0, 1620000000000000005), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "0cf883f112ad239b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-002", "traceID", "61330481e1e59b18").String(),
Entries: []logproto.Entry{
{Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "61330481e1e59b18")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "dev", "traceID", "9de325g124ad230b").String(),
Entries: []logproto.Entry{
{Line: "log line 6", Timestamp: time.Unix(0, 1620000000000000006), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "9de325g124ad230b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "40e50221e284b9d2").String(),
Entries: []logproto.Entry{
{Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "40e50221e284b9d2")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "46ce02549441e41c").String(),
Entries: []logproto.Entry{
{Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "46ce02549441e41c")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
}
require.Equal(t, expected, result.Data.(logqlmodel.Streams))
})
t.Run("multiple records with different streams are accumulated correctly", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
},
nil,
)
// First record: prod and dev streams
rows1 := arrowtest.Rows{
{
colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(),
colMsg.FQN(): "log line 1",
colEnv.FQN(): "prod",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(),
colMsg.FQN(): "log line 2",
colEnv.FQN(): "dev",
},
}
record1 := rows1.Record(memory.DefaultAllocator, schema)
defer record1.Release()
// Second record: prod and staging streams
rows2 := arrowtest.Rows{
{
colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(),
colMsg.FQN(): "log line 3",
colEnv.FQN(): "prod",
},
{
colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(),
colMsg.FQN(): "log line 4",
colEnv.FQN(): "staging",
},
}
record2 := rows2.Record(memory.DefaultAllocator, schema)
defer record2.Release()
builder := newStreamsResultBuilder(logproto.FORWARD, false)
// Collect first record
builder.CollectRecord(record1)
require.Equal(t, 2, builder.Len(), "should have 2 entries after first record")
// Collect second record
builder.CollectRecord(record2)
require.Equal(t, 4, builder.Len(), "should have 4 entries total after second record")
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
// Note: 3 unique streams (dev, prod, staging), but 4 total entries
// The prod stream has 2 entries (one from each record)
require.Equal(t, 3, len(streams), "should have 3 unique streams")
// Verify stream grouping - prod stream should have entries from both records
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "dev").String(),
Entries: []logproto.Entry{
{Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "prod").String(),
Entries: []logproto.Entry{
{Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
{Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
push.Stream{
Labels: labels.FromStrings("env", "staging").String(),
Entries: []logproto.Entry{
{Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
}
require.Equal(t, expected, streams)
})
t.Run("buffer reuse with varying record sizes", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
},
nil,
)
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
// First record: 5 rows (buffer grows to 5)
rows1 := make(arrowtest.Rows, 5)
for i := 0; i < 5; i++ {
rows1[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "prod",
}
}
record1 := rows1.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record1)
record1.Release()
require.Equal(t, 5, builder.Len())
require.Equal(t, 5, len(builder.rowBuilders), "buffer should have 5 rowBuilders")
// Second record: 2 rows (buffer shrinks to 2)
rows2 := make(arrowtest.Rows, 2)
for i := 0; i < 2; i++ {
rows2[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "dev",
}
}
record2 := rows2.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record2)
record2.Release()
require.Equal(t, 7, builder.Len())
require.Equal(t, 2, len(builder.rowBuilders), "buffer should shrink to 2 rowBuilders")
// Third record: 10 rows (buffer grows to 10)
rows3 := make(arrowtest.Rows, 10)
for i := 0; i < 10; i++ {
rows3[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000020+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "staging",
}
}
record3 := rows3.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record3)
record3.Release()
require.Equal(t, 17, builder.Len())
require.Equal(t, 10, len(builder.rowBuilders), "buffer should grow to 10 rowBuilders")
// Verify all rowBuilders are properly initialized
for i := 0; i < len(builder.rowBuilders); i++ {
require.NotNil(t, builder.rowBuilders[i].lbsBuilder, "lbsBuilder should be initialized")
require.NotNil(t, builder.rowBuilders[i].metadataBuilder, "metadataBuilder should be initialized")
require.NotNil(t, builder.rowBuilders[i].parsedBuilder, "parsedBuilder should be initialized")
require.Equal(t, 0, len(builder.rowBuilders[i].parsedEmptyKeys), "parsedEmptyKeys should be empty")
}
})
t.Run("empty records mixed with valid records", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
},
nil,
)
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
// First record: 3 valid rows
rows1 := make(arrowtest.Rows, 3)
for i := 0; i < 3; i++ {
rows1[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "prod",
}
}
record1 := rows1.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record1)
record1.Release()
require.Equal(t, 3, builder.Len())
// Second record: empty (0 rows)
rows2 := arrowtest.Rows{}
record2 := rows2.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record2)
record2.Release()
require.Equal(t, 3, builder.Len(), "empty record should not change count")
// Third record: 2 valid rows
rows3 := make(arrowtest.Rows, 2)
for i := 0; i < 2; i++ {
rows3[i] = arrowtest.Row{
colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(),
colMsg.FQN(): "log line",
colEnv.FQN(): "dev",
}
}
record3 := rows3.Record(memory.DefaultAllocator, schema)
builder.CollectRecord(record3)
record3.Release()
require.Equal(t, 5, builder.Len(), "should have 5 total entries")
// Verify final result
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
// Note: 2 unique streams (prod with 3 entries, dev with 2 entries) = 5 total entries
require.Equal(t, 2, len(streams), "should have 2 unique streams")
// Verify the streams have the correct number of entries
var totalEntries int
for _, stream := range streams {
totalEntries += len(stream.Entries)
}
require.Equal(t, 5, totalEntries, "should have 5 total entries across both streams")
})
t.Run("parsed empty values are added to the parsed labels", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colMetadata := semconv.NewIdentifier("metadata", types.ColumnTypeMetadata, types.Loki.String)
colParsedA := semconv.NewIdentifier("Aparsed", types.ColumnTypeParsed, types.Loki.String)
colParsedZ := semconv.NewIdentifier("Zparsed", types.ColumnTypeParsed, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
semconv.FieldFromIdent(colMetadata, false),
semconv.FieldFromIdent(colParsedA, false),
semconv.FieldFromIdent(colParsedZ, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "md value", colParsedA.FQN(): "A", colParsedZ.FQN(): "Z"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "", colParsedA.FQN(): "", colParsedZ.FQN(): ""},
}
record := rows.Record(memory.DefaultAllocator, schema)
builder := newStreamsResultBuilder(logproto.BACKWARD, false)
builder.CollectRecord(record)
record.Release()
require.Equal(t, 2, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
require.Equal(t, 2, len(streams), "should have 2 unique streams")
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("Aparsed", "", "env", "prod", "Zparsed", "").String(),
Entries: []logproto.Entry{
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("Aparsed", "", "Zparsed", ""))},
},
},
push.Stream{
Labels: labels.FromStrings("Aparsed", "A", "env", "prod", "metadata", "md value", "Zparsed", "Z").String(),
Entries: []logproto.Entry{
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("Aparsed", "A", "Zparsed", "Z"))},
},
},
}
require.Equal(t, expected, streams)
})
t.Run("categorize labels does not consider metadata when building output streams", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colMetadata := semconv.NewIdentifier("metadata", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colMsg, false),
semconv.FieldFromIdent(colEnv, false),
semconv.FieldFromIdent(colMetadata, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "a md value"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colMsg.FQN(): "log line", colEnv.FQN(): "prod", colMetadata.FQN(): "another md value"},
}
record := rows.Record(memory.DefaultAllocator, schema)
builder := newStreamsResultBuilder(logproto.BACKWARD, true)
builder.CollectRecord(record)
record.Release()
require.Equal(t, 2, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
streams := result.Data.(logqlmodel.Streams)
require.Equal(t, 1, len(streams), "should have 1 unique stream")
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "prod").String(),
Entries: []logproto.Entry{
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "a md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
{Line: "log line", Timestamp: time.Unix(0, 1620000000000000000), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("metadata", "another md value")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})},
},
},
}
require.Equal(t, expected, streams)
})
}
func TestVectorResultBuilder(t *testing.T) {
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newVectorResultBuilder()
md, _ := metadata.NewContext(t.Context())
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
})
t.Run("successful conversion of vector data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
semconv.FieldFromIdent(colJob, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(32), colInst.FQN(): nil, colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(15), colInst.FQN(): "localhost:9100", colJob.FQN(): "prometheus"},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newVectorResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 4, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
vector := result.Data.(promql.Vector)
require.Equal(t, 4, len(vector))
expected := promql.Vector{
{
T: int64(1620000000000),
F: 42.0,
Metric: labels.FromStrings("instance", "localhost:9090", "job", "prometheus"),
},
{
T: int64(1620000000000),
F: 23.0,
Metric: labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"),
},
{
T: int64(1620000000000),
F: 15.0,
Metric: labels.FromStrings("instance", "localhost:9100", "job", "prometheus"),
},
{
T: int64(1620000000000),
F: 32.0,
Metric: labels.FromStrings("job", "node-exporter"),
},
}
require.Equal(t, expected, vector)
})
// TODO:(ashwanth) also enforce grouping labels are all present?
t.Run("rows without timestamp or value are ignored", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): nil, colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): nil, colInst.FQN(): "localhost:9100"},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newVectorResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.Len(), "expected no samples to be collected")
})
}
func TestMatrixResultBuilder(t *testing.T) {
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newMatrixResultBuilder()
md, _ := metadata.NewContext(t.Context())
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
})
t.Run("successful conversion of matrix data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(colTs, false),
semconv.FieldFromIdent(colVal, false),
semconv.FieldFromIdent(colInst, false),
semconv.FieldFromIdent(colJob, false),
},
nil,
)
rows := arrowtest.Rows{
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(42), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(43), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(44), colInst.FQN(): "localhost:9090", colJob.FQN(): "prometheus"},
{colTs.FQN(): time.Unix(0, 1620000000000000000).UTC(), colVal.FQN(): float64(23), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000001000000000).UTC(), colVal.FQN(): float64(24), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
{colTs.FQN(): time.Unix(0, 1620000002000000000).UTC(), colVal.FQN(): float64(25), colInst.FQN(): "localhost:9100", colJob.FQN(): "node-exporter"},
}
record := rows.Record(memory.DefaultAllocator, schema)
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newMatrixResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 6, builder.Len())
md, _ := metadata.NewContext(t.Context())
result := builder.Build(stats.Result{}, md)
matrix := result.Data.(promql.Matrix)
require.Equal(t, 2, len(matrix))
expected := promql.Matrix{
{
Metric: labels.FromStrings("instance", "localhost:9090", "job", "prometheus"),
Floats: []promql.FPoint{
{T: int64(1620000000000), F: 42.0},
{T: int64(1620000001000), F: 43.0},
{T: int64(1620000002000), F: 44.0},
},
},
{
Metric: labels.FromStrings("instance", "localhost:9100", "job", "node-exporter"),
Floats: []promql.FPoint{
{T: int64(1620000000000), F: 23.0},
{T: int64(1620000001000), F: 24.0},
{T: int64(1620000002000), F: 25.0},
},
},
}
require.Equal(t, expected, matrix)
})
}