Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/engine_test.go

178 lines
6.7 KiB

package engine
import (
"context"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/pkg/push"
)
func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arrow.Record {
mem := memory.NewGoAllocator()
builder := array.NewRecordBuilder(mem, schema)
defer builder.Release()
for _, row := range data {
for j, val := range row {
if val == nil {
builder.Field(j).AppendNull()
continue
}
switch builder.Field(j).(type) {
case *array.BooleanBuilder:
builder.Field(j).(*array.StringBuilder).Append(val.(string))
case *array.StringBuilder:
builder.Field(j).(*array.StringBuilder).Append(val.(string))
case *array.Uint64Builder:
builder.Field(j).(*array.Uint64Builder).Append(val.(uint64))
case *array.Int64Builder:
builder.Field(j).(*array.Int64Builder).Append(val.(int64))
case *array.Float64Builder:
builder.Field(j).(*array.Float64Builder).Append(val.(float64))
case *array.TimestampBuilder:
builder.Field(j).(*array.TimestampBuilder).Append(val.(arrow.Timestamp))
default:
t.Fatal("invalid field type")
}
}
}
return builder.NewRecord()
}
func TestConvertArrowRecordsToLokiResult(t *testing.T) {
mdTypeLabel := datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)
mdTypeMetadata := datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String)
t.Run("rows without log line, timestamp, or labels are ignored", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinMessage},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
},
nil,
)
data := [][]interface{}{
{arrow.Timestamp(1620000000000000001), nil, "prod"},
{nil, "log line", "prod"},
{arrow.Timestamp(1620000000000000003), "log line", nil},
}
record := createRecord(t, schema, data)
defer record.Release()
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.len(), "expected no entries to be collected")
})
t.Run("fields without metadata are ignored", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String},
{Name: "env", Type: arrow.BinaryTypes.String},
},
nil,
)
data := [][]interface{}{
{arrow.Timestamp(1620000000000000001), "log line 1", "prod"},
{arrow.Timestamp(1620000000000000002), "log line 2", "prod"},
{arrow.Timestamp(1620000000000000003), "log line 3", "prod"},
}
record := createRecord(t, schema, data)
defer record.Release()
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 0, builder.len(), "expected no entries to be collected")
})
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameBuiltinMessage, Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinMessage},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
{Name: "traceID", Type: arrow.BinaryTypes.String, Metadata: mdTypeMetadata},
},
nil,
)
data := [][]interface{}{
{arrow.Timestamp(1620000000000000001), "log line 1", "dev", "loki-dev-001", "860e403fcf754312"},
{arrow.Timestamp(1620000000000000002), "log line 2", "prod", "loki-prod-001", "46ce02549441e41c"},
{arrow.Timestamp(1620000000000000003), "log line 3", "dev", "loki-dev-002", "61330481e1e59b18"},
{arrow.Timestamp(1620000000000000004), "log line 4", "prod", "loki-prod-001", "40e50221e284b9d2"},
{arrow.Timestamp(1620000000000000005), "log line 5", "dev", "loki-dev-002", "0cf883f112ad239b"},
}
record := createRecord(t, schema, data)
defer record.Release()
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()
builder := newResultBuilder()
err := collectResult(context.Background(), pipeline, builder)
require.NoError(t, err)
require.Equal(t, 5, builder.len())
result := builder.build()
require.Equal(t, 3, result.Data.(logqlmodel.Streams).Len())
expected := logqlmodel.Streams{
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-001").String(),
Entries: []logproto.Entry{
{Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "860e403fcf754312"))},
},
},
push.Stream{
Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001").String(),
Entries: []logproto.Entry{
{Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "46ce02549441e41c"))},
{Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "40e50221e284b9d2"))},
},
},
push.Stream{
Labels: labels.FromStrings("env", "dev", "namespace", "loki-dev-002").String(),
Entries: []logproto.Entry{
{Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "61330481e1e59b18"))},
{Line: "log line 5", Timestamp: time.Unix(0, 1620000000000000005), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "0cf883f112ad239b"))},
},
},
}
require.Equal(t, expected, result.Data.(logqlmodel.Streams))
})
}