Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/parse_linefmt_test.go

126 lines
3.9 KiB

package executor
import (
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
func TestLinefmtParser_Process(t *testing.T) {
timestamp := time.Now().In(time.UTC).Add(time.Duration(1) * time.Second)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(semconv.ColumnIdentMessage, false),
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
semconv.FieldFromIdent(semconv.NewIdentifier("namespace", types.ColumnTypeMetadata, types.Loki.String), false),
},
nil, // No metadata
)
tests := []struct {
name string
line string
timeVal arrow.Timestamp
namespace string
lineFmt string
want string
}{
{
"simple replacement",
"this is my test line of logs",
arrow.Timestamp(timestamp.UnixNano()),
"dev",
"{{.namespace}}",
"dev",
},
{
"replacement plus addition",
"this is my test line of logs",
arrow.Timestamp(timestamp.UnixNano()),
"dev",
"{{.namespace}} foo",
"dev foo",
},
{
"timestamp",
"this is my test line of logs",
arrow.Timestamp(timestamp.UnixNano()),
"dev",
"{{.timestamp}} foo",
timestamp.In(time.UTC).Format("2006-01-02T15:04:05.999999999Z") + " foo",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create builders for each column
logBuilder := array.NewStringBuilder(memory.DefaultAllocator)
tsBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
namespaceBuilder := array.NewStringBuilder(memory.DefaultAllocator)
// Append data to the builders
logs := make([]string, 1)
ts := make([]arrow.Timestamp, 1)
namespaces := make([]string, 1)
logs[0] = tt.line
ts[0] = tt.timeVal
namespaces[0] = tt.namespace
tsBuilder.AppendValues(ts, nil)
logBuilder.AppendValues(logs, nil)
namespaceBuilder.AppendValues(namespaces, nil)
// Build the arrays
logArray := logBuilder.NewArray()
tsArray := tsBuilder.NewArray()
namespaceArray := namespaceBuilder.NewArray()
columns := []arrow.Array{logArray, tsArray, namespaceArray}
recordBatch := array.NewRecordBatch(schema, columns, 1)
parser, err := NewFormatter(tt.lineFmt)
require.NoError(t, err)
var result = map[string]string{}
output, processErr := parser.Process(tt.line, recordBatch, result)
require.NoError(t, processErr)
require.Equal(t, tt.want, output)
require.Equal(t, tt.want, result["message"])
})
}
}
func TestBuildLinefmtColumns_TemplateExecutionErrorProducesErrorDetails(t *testing.T) {
timestamp := time.Now().In(time.UTC).Add(time.Second)
schema := arrow.NewSchema(
[]arrow.Field{
semconv.FieldFromIdent(semconv.ColumnIdentMessage, false),
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
},
nil,
)
logBuilder := array.NewStringBuilder(memory.DefaultAllocator)
tsBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
logBuilder.Append("line")
tsBuilder.Append(arrow.Timestamp(timestamp.UnixNano()))
logArray := logBuilder.NewStringArray()
tsArray := tsBuilder.NewArray()
recordBatch := array.NewRecordBatch(schema, []arrow.Array{logArray, tsArray}, 1)
headers, columns := buildLinefmtColumns(recordBatch, logArray, "{{.foo | now}}")
require.Equal(t, []string{
semconv.ColumnIdentError.ShortName(),
semconv.ColumnIdentErrorDetails.ShortName(),
}, headers)
errColumn := columns[0].(*array.String)
errDetailsColumn := columns[1].(*array.String)
require.Equal(t, types.LinefmtParserErrorType, errColumn.Value(0))
require.Contains(t, errDetailsColumn.Value(0), "wrong number of args for now")
}