Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/postings/reader_test.go

96 lines
2.8 KiB

package postings_test
import (
"context"
"errors"
"io"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/postings"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
func TestReader_ReadBeforeOpen(t *testing.T) {
r := postings.NewReader(postings.ReaderOptions{
Allocator: memory.DefaultAllocator,
})
rec, err := r.Read(context.Background(), 128)
require.Nil(t, rec)
require.ErrorContains(t, err, "reader not opened")
}
func TestReader_RoundTrip(t *testing.T) {
// Build a tiny postings section with one label entry, read it back.
b := postings.NewBuilder(nil, 0, 0)
ts := time.Unix(0, 0).UTC()
b.ObserveLabelPosting(postings.LabelObservation{ObjectPath: "/obj", SectionIndex: 0, ColumnName: "env", LabelValue: "prod", StreamID: 1, Timestamp: ts, UncompressedSize: 100})
objBuilder := dataobj.NewBuilder(nil)
require.NoError(t, objBuilder.Append(b))
obj, closer, err := objBuilder.Flush()
require.NoError(t, err)
t.Cleanup(func() { _ = closer.Close() })
var sec *postings.Section
for _, s := range obj.Sections() {
if !postings.CheckSection(s) {
continue
}
sec, err = postings.Open(context.Background(), s)
require.NoError(t, err)
break
}
require.NotNil(t, sec)
r := postings.NewReader(postings.ReaderOptions{
Columns: sec.Columns(),
Allocator: memory.DefaultAllocator,
})
require.NoError(t, r.Open(context.Background()))
t.Cleanup(func() { _ = r.Close() })
tbl, err := readTable(context.Background(), r)
require.NoError(t, err)
actual, err := arrowtest.TableRows(memory.DefaultAllocator, tbl)
require.NoError(t, err)
require.Len(t, actual, 1)
row := actual[0]
require.Equal(t, int64(postings.KindLabel), row["kind.int64"])
require.Equal(t, "/obj", row["object_path.utf8"])
require.Equal(t, int64(0), row["section_index.int64"])
require.Equal(t, "env", row["column_name.utf8"])
require.Equal(t, "prod", row["label_value.utf8"])
require.Equal(t, int64(100), row["uncompressed_size.int64"])
require.Equal(t, ts.UTC(), row["min_timestamp.timestamp"])
require.Equal(t, ts.UTC(), row["max_timestamp.timestamp"])
}
// readTable drains a Reader into an arrow.Table, mirroring the helper in
// streams/reader_test.go.
func readTable(ctx context.Context, r *postings.Reader) (arrow.Table, error) {
var recs []arrow.RecordBatch
for {
rec, err := r.Read(ctx, 128)
if rec != nil && rec.NumRows() > 0 {
recs = append(recs, rec)
}
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return nil, err
}
}
if len(recs) == 0 {
return nil, io.EOF
}
return array.NewTableFromRecords(recs[0].Schema(), recs), nil
}