Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/executor/stream_injector_test.go

55 lines
1.8 KiB

package executor
import (
"testing"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
func Test_streamInjector(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
inputStreams := []labels.Labels{
labels.FromStrings("app", "loki", "env", "prod", "region", "us-west"),
labels.FromStrings("app", "loki", "env", "dev"),
labels.FromStrings("app", "loki", "env", "prod", "region", "us-east"),
}
sec := buildStreamsSection(t, inputStreams)
input := arrowtest.Rows{
{"stream_id.int64": 2, "ts": int64(1), "line": "log line 1"},
{"stream_id.int64": 1, "ts": int64(2), "line": "log line 2"},
{"stream_id.int64": 3, "ts": int64(3), "line": "log line 3"},
{"stream_id.int64": 2, "ts": int64(4), "line": "log line 4"},
}
record := input.Record(alloc, input.Schema())
defer record.Release()
view := newStreamsView(sec, &streamsViewOptions{})
defer view.Close()
injector := newStreamInjector(alloc, view)
output, err := injector.Inject(t.Context(), record)
if output != nil {
defer output.Release()
}
require.NoError(t, err)
expect := arrowtest.Rows{
{"app": "loki", "env": "dev", "region": nil, "ts": int64(1), "line": "log line 1"},
{"app": "loki", "env": "prod", "region": "us-west", "ts": int64(2), "line": "log line 2"},
{"app": "loki", "env": "prod", "region": "us-east", "ts": int64(3), "line": "log line 3"},
{"app": "loki", "env": "dev", "region": nil, "ts": int64(4), "line": "log line 4"},
}
actual, err := arrowtest.RecordRows(output)
require.NoError(t, err, "failed to convert output record to rows")
require.Equal(t, expect, actual, "expected output to match input with stream labels injected")
}