Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/streams_view_test.go

165 lines
4.5 KiB

package executor
import (
"iter"
"slices"
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
)
func Test_streamsView(t *testing.T) {
inputStreams := []labels.Labels{
labels.FromStrings("app", "loki", "env", "prod", "region", "us-west"),
labels.FromStrings("app", "loki", "env", "dev"),
labels.FromStrings("app", "loki", "env", "prod", "region", "us-east"),
}
sec := buildStreamsSection(t, inputStreams)
t.Run("default", func(t *testing.T) {
view := newStreamsView(sec, &streamsViewOptions{
BatchSize: 1,
})
var actual []labels.Labels
for id := 1; id <= 3; id++ {
it, err := view.Labels(t.Context(), int64(id))
require.NoError(t, err, "failed to get labels iterator")
actual = append(actual, collectLabels(it))
}
require.Equal(t, inputStreams, actual, "expected all streams to be returned")
})
t.Run("one stream", func(t *testing.T) {
view := newStreamsView(sec, &streamsViewOptions{
StreamIDs: []int64{2},
BatchSize: 1,
})
var actual []labels.Labels
it, err := view.Labels(t.Context(), int64(2))
require.NoError(t, err, "failed to get labels iterator")
actual = append(actual, collectLabels(it))
expected := []labels.Labels{
inputStreams[1], // Stream ID 2
}
require.Equal(t, expected, actual, "expected only specified streams to be returned")
})
t.Run("specific streams", func(t *testing.T) {
view := newStreamsView(sec, &streamsViewOptions{
StreamIDs: []int64{2, 3},
BatchSize: 1,
})
var actual []labels.Labels
for _, id := range []int{2, 3} {
it, err := view.Labels(t.Context(), int64(id))
require.NoError(t, err, "failed to get labels iterator")
actual = append(actual, collectLabels(it))
}
expected := []labels.Labels{
inputStreams[1], // Stream ID 2
inputStreams[2], // Stream ID 3
}
require.Equal(t, expected, actual, "expected only specified streams to be returned")
})
t.Run("one label", func(t *testing.T) {
regionColumnIndex := slices.IndexFunc(sec.Columns(), func(c *streams.Column) bool {
return c.Name == "region"
})
view := newStreamsView(sec, &streamsViewOptions{
LabelColumns: []*streams.Column{sec.Columns()[regionColumnIndex]},
BatchSize: 1,
})
expect := []labels.Labels{
labels.FromStrings("region", "us-west"),
labels.FromStrings(),
labels.FromStrings("region", "us-east"),
}
var actual []labels.Labels
for id := 1; id <= 3; id++ {
it, err := view.Labels(t.Context(), int64(id))
require.NoError(t, err, "failed to get labels iterator")
actual = append(actual, collectLabels(it))
}
require.Equal(t, expect, actual, "expected all streams to be returned with the proper labels")
})
t.Run("two labels", func(t *testing.T) {
appColumnIndex := slices.IndexFunc(sec.Columns(), func(c *streams.Column) bool {
return c.Name == "app"
})
envColumnIndex := slices.IndexFunc(sec.Columns(), func(c *streams.Column) bool {
return c.Name == "env"
})
view := newStreamsView(sec, &streamsViewOptions{
LabelColumns: []*streams.Column{
sec.Columns()[appColumnIndex],
sec.Columns()[envColumnIndex],
},
BatchSize: 1,
})
expect := []labels.Labels{
labels.FromStrings("app", "loki", "env", "prod"),
labels.FromStrings("app", "loki", "env", "dev"),
labels.FromStrings("app", "loki", "env", "prod"),
}
var actual []labels.Labels
for id := 1; id <= 3; id++ {
it, err := view.Labels(t.Context(), int64(id))
require.NoError(t, err, "failed to get labels iterator")
actual = append(actual, collectLabels(it))
}
require.Equal(t, expect, actual, "expected all streams to be returned with the proper labels")
})
}
func buildStreamsSection(t *testing.T, streamLabels []labels.Labels) *streams.Section {
streamsBuilder := streams.NewBuilder(nil, 8192, 0)
for _, stream := range streamLabels {
_ = streamsBuilder.Record(stream, time.Now().UTC(), 0)
}
objBuilder := dataobj.NewBuilder(nil)
require.NoError(t, objBuilder.Append(streamsBuilder), "failed to append streams section")
obj, closer, err := objBuilder.Flush()
require.NoError(t, err, "failed to flush dataobj")
t.Cleanup(func() { closer.Close() })
sec, err := streams.Open(t.Context(), obj.Sections()[0])
require.NoError(t, err, "failed to open streams section")
return sec
}
func collectLabels(it iter.Seq[labels.Label]) labels.Labels {
var ls []labels.Label
for l := range it {
ls = append(ls, l)
}
return labels.New(ls...)
}