Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/stats/reader_test.go

99 lines
2.6 KiB

package stats_test
import (
"context"
"errors"
"io"
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/stats"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
func TestReader_ReadBeforeOpen(t *testing.T) {
r := stats.NewReader(stats.ReaderOptions{
Allocator: memory.DefaultAllocator,
})
rec, err := r.Read(context.Background(), 128)
require.Nil(t, rec)
require.ErrorContains(t, err, "reader not opened")
}
func TestReader_RoundTrip(t *testing.T) {
b := stats.NewBuilder(nil, stats.ColumnarSectionEncoder(1024*1024, 10000))
b.SetTenant("test-tenant")
b.Append(stats.Stat{
ObjectPath: "/obj",
SectionIndex: 0,
SortSchema: "service_name",
Labels: map[string]string{"service_name": "svc"},
MinTimestamp: 100,
MaxTimestamp: 200,
RowCount: 5,
UncompressedSize: 50,
})
objBuilder := dataobj.NewBuilder(nil)
require.NoError(t, objBuilder.Append(b))
obj, closer, err := objBuilder.Flush()
require.NoError(t, err)
t.Cleanup(func() { _ = closer.Close() })
var sec *stats.Section
for _, s := range obj.Sections() {
if !stats.CheckSection(s) {
continue
}
sec, err = stats.Open(context.Background(), s)
require.NoError(t, err)
break
}
require.NotNil(t, sec)
r := stats.NewReader(stats.ReaderOptions{
Columns: sec.Columns(),
Allocator: memory.DefaultAllocator,
})
require.NoError(t, r.Open(context.Background()))
t.Cleanup(func() { _ = r.Close() })
tbl, err := readTable(context.Background(), r)
require.NoError(t, err)
actual, err := arrowtest.TableRows(memory.DefaultAllocator, tbl)
require.NoError(t, err)
require.Len(t, actual, 1)
row := actual[0]
require.Equal(t, "/obj", row["object_path.utf8"])
require.Equal(t, "service_name", row["sort_schema.utf8"])
require.Equal(t, int64(5), row["row_count.int64"])
require.Equal(t, int64(50), row["uncompressed_size.int64"])
require.Equal(t, "svc", row["service_name.label.utf8"])
}
// readTable drains a Reader into an arrow.Table, mirroring the helper in
// streams/reader_test.go.
func readTable(ctx context.Context, r *stats.Reader) (arrow.Table, error) {
var recs []arrow.RecordBatch
for {
rec, err := r.Read(ctx, 128)
if rec != nil && rec.NumRows() > 0 {
recs = append(recs, rec)
}
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return nil, err
}
}
if len(recs) == 0 {
return nil, io.EOF
}
return array.NewTableFromRecords(recs[0].Schema(), recs), nil
}