Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/stats/builder_test.go

527 lines
17 KiB

package stats
import (
"context"
"errors"
"io"
"strings"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
// defaultEncoder is a SectionEncoder using default page sizes for testing.
var defaultEncoder = ColumnarSectionEncoder(1024*1024, 10000)
// buildObject flushes the builder into a dataobj.Object using dataobj.Builder.
func buildObject(t *testing.T, b *Builder) (*dataobj.Object, io.Closer) {
t.Helper()
b.SetTenant("test-tenant")
objBuilder := dataobj.NewBuilder(nil)
err := objBuilder.Append(b)
require.NoError(t, err)
obj, closer, err := objBuilder.Flush()
require.NoError(t, err)
return obj, closer
}
// readTable drains a Reader into an arrow.Table, mirroring streams/reader_test.go.
func readTable(ctx context.Context, r *Reader) (arrow.Table, error) {
var recs []arrow.RecordBatch
for {
rec, err := r.Read(ctx, 128)
if rec != nil && rec.NumRows() > 0 {
recs = append(recs, rec)
}
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return nil, err
}
}
if len(recs) == 0 {
return nil, io.EOF
}
return array.NewTableFromRecords(recs[0].Schema(), recs), nil
}
// readAllRowsFromObject opens every stats section in obj and concatenates
// all rows in obj.Sections() order (stable).
func readAllRowsFromObject(t *testing.T, obj *dataobj.Object) arrowtest.Rows {
t.Helper()
var all arrowtest.Rows
for _, s := range obj.Sections() {
if !CheckSection(s) {
continue
}
sec, err := Open(context.Background(), s)
require.NoError(t, err)
r := NewReader(ReaderOptions{
Columns: sec.Columns(),
Allocator: memory.DefaultAllocator,
})
require.NoError(t, r.Open(context.Background()))
t.Cleanup(func() { _ = r.Close() })
tbl, err := readTable(context.Background(), r)
if errors.Is(err, io.EOF) {
continue
}
require.NoError(t, err)
rows, err := arrowtest.TableRows(memory.DefaultAllocator, tbl)
require.NoError(t, err)
all = append(all, rows...)
}
return all
}
// TestBuilder_Empty verifies that an empty builder produces no sections.
func TestBuilder_Empty(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
require.Zero(t, b.EstimatedSize(), "empty builder should have zero size")
require.Empty(t, b.rows, "empty builder should have no rows")
}
// TestBuilder_RoundTrip verifies stats round-trip correctly through on-disk encoding.
func TestBuilder_RoundTrip(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
input := []Stat{
{
ObjectPath: "/tenant/abc/obj1",
SectionIndex: 0,
SortSchema: "service_name",
Labels: map[string]string{"service_name": "foo"},
MinTimestamp: 1000,
MaxTimestamp: 2000,
RowCount: 100,
UncompressedSize: 8192,
},
{
ObjectPath: "/tenant/abc/obj2",
SectionIndex: 1,
SortSchema: "service_name",
Labels: map[string]string{"service_name": "bar"},
MinTimestamp: 500,
MaxTimestamp: 1500,
RowCount: 50,
UncompressedSize: 4096,
},
{
ObjectPath: "/tenant/abc/obj3",
SectionIndex: 2,
SortSchema: "service_name",
Labels: map[string]string{"service_name": "baz"},
MinTimestamp: 3000,
MaxTimestamp: 4000,
RowCount: 200,
UncompressedSize: 16384,
},
}
for _, s := range input {
b.Append(s)
}
obj, closer := buildObject(t, b)
t.Cleanup(func() { _ = closer.Close() })
actual := readAllRowsFromObject(t, obj)
// Sort order: bar < baz < foo
expected := arrowtest.Rows{
{
"object_path.utf8": "/tenant/abc/obj2",
"section_index.int64": int64(1),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 500).UTC(),
"max_timestamp.timestamp": time.Unix(0, 1500).UTC(),
"row_count.int64": int64(50),
"uncompressed_size.int64": int64(4096),
"service_name.label.utf8": "bar",
},
{
"object_path.utf8": "/tenant/abc/obj3",
"section_index.int64": int64(2),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 3000).UTC(),
"max_timestamp.timestamp": time.Unix(0, 4000).UTC(),
"row_count.int64": int64(200),
"uncompressed_size.int64": int64(16384),
"service_name.label.utf8": "baz",
},
{
"object_path.utf8": "/tenant/abc/obj1",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 1000).UTC(),
"max_timestamp.timestamp": time.Unix(0, 2000).UTC(),
"row_count.int64": int64(100),
"uncompressed_size.int64": int64(8192),
"service_name.label.utf8": "foo",
},
}
require.Equal(t, expected, actual)
}
// TestBuilder_SortOrder verifies the sort order: label values in sort-schema order,
// then MinTimestamp, then MaxTimestamp.
func TestBuilder_SortOrder(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
// Intentionally appended out of order.
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "beta"}, MinTimestamp: 200})
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "alpha"}, MinTimestamp: 300})
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "alpha"}, MinTimestamp: 100})
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "gamma"}, MinTimestamp: 50})
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "alpha"}, MinTimestamp: 200})
obj, closer := buildObject(t, b)
t.Cleanup(func() { _ = closer.Close() })
actual := readAllRowsFromObject(t, obj)
// Verify sort order: alpha(100), alpha(200), alpha(300), beta(200), gamma(50).
expected := arrowtest.Rows{
{
"object_path.utf8": "",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 100).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "alpha",
},
{
"object_path.utf8": "",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "alpha",
},
{
"object_path.utf8": "",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 300).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "alpha",
},
{
"object_path.utf8": "",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "beta",
},
{
"object_path.utf8": "",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 50).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "gamma",
},
}
require.Equal(t, expected, actual)
}
// TestBuilder_AllSameServiceName verifies that rows with identical service_name
// are sorted by MinTimestamp.
func TestBuilder_AllSameServiceName(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
// Multiple rows with the same service_name, different timestamps.
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 300, ObjectPath: "c"})
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 100, ObjectPath: "a"})
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 200, ObjectPath: "b"})
obj, closer := buildObject(t, b)
t.Cleanup(func() { _ = closer.Close() })
actual := readAllRowsFromObject(t, obj)
// Sort is by MinTimestamp within the same service_name.
expected := arrowtest.Rows{
{
"object_path.utf8": "a",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 100).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "svc",
},
{
"object_path.utf8": "b",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "svc",
},
{
"object_path.utf8": "c",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 300).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "svc",
},
}
require.Equal(t, expected, actual)
}
// TestBuilder_MissingServiceName verifies rows with empty/missing label values sort before non-empty ones.
func TestBuilder_MissingServiceName(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": ""}, ObjectPath: "obj1", MinTimestamp: 100})
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, ObjectPath: "obj2", MinTimestamp: 200})
obj, closer := buildObject(t, b)
t.Cleanup(func() { _ = closer.Close() })
actual := readAllRowsFromObject(t, obj)
// Empty string sorts before "svc".
expected := arrowtest.Rows{
{
"object_path.utf8": "obj1",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 100).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "",
},
{
"object_path.utf8": "obj2",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "svc",
},
}
require.Equal(t, expected, actual)
}
// TestBuilder_SectionSplitting verifies the mid-accumulation flush pattern using dataobj.Builder:
// append rows, flush via dataobjBuilder.Append, append more, verify 2 sections.
func TestBuilder_SectionSplitting(t *testing.T) {
// Use a very small page size to force multiple pages.
smallEncoder := ColumnarSectionEncoder(100, 2)
b := NewBuilder(nil, smallEncoder)
b.SetTenant("test-tenant")
// Append first batch.
for i := range 3 {
b.Append(Stat{
ObjectPath: "x",
SortSchema: "service_name",
Labels: map[string]string{"service_name": "svc"},
MinTimestamp: int64(i * 100),
})
}
objBuilder := dataobj.NewBuilder(nil)
// Flush first batch into objBuilder.
require.NoError(t, objBuilder.Append(b))
// Append second batch.
for i := range 3 {
b.Append(Stat{
ObjectPath: "y",
SortSchema: "service_name",
Labels: map[string]string{"service_name": "svc"},
MinTimestamp: int64((i + 3) * 100),
})
}
// Flush second batch into objBuilder.
require.NoError(t, objBuilder.Append(b))
obj, closer, err := objBuilder.Flush()
require.NoError(t, err)
t.Cleanup(func() { _ = closer.Close() })
// Verify 2 sections were created.
var statsSections []*dataobj.Section
for _, s := range obj.Sections() {
if CheckSection(s) {
statsSections = append(statsSections, s)
}
}
require.Len(t, statsSections, 2, "expected 2 stats sections after two flushes")
// Collect all rows from both sections.
actual := readAllRowsFromObject(t, obj)
require.Len(t, actual, 6)
}
// TestBuilder_LargeValues verifies large label and path values round-trip correctly.
func TestBuilder_LargeValues(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
longPath := "/" + strings.Repeat("a", 10000)
longLabel := strings.Repeat("b", 5000)
longSchema := strings.Repeat("c", 2000)
b.Append(Stat{
ObjectPath: longPath,
SortSchema: longSchema,
Labels: map[string]string{longSchema: longLabel},
SectionIndex: 99,
MinTimestamp: 1_000_000,
MaxTimestamp: 2_000_000,
RowCount: 99999,
UncompressedSize: 1_000_000_000,
})
obj, closer := buildObject(t, b)
t.Cleanup(func() { _ = closer.Close() })
actual := readAllRowsFromObject(t, obj)
require.Len(t, actual, 1)
// Build the expected label column name dynamically
labelColName := longSchema + ".label.utf8"
expected := arrowtest.Rows{
{
"object_path.utf8": longPath,
"section_index.int64": int64(99),
"sort_schema.utf8": longSchema,
"min_timestamp.timestamp": time.Unix(0, 1_000_000).UTC(),
"max_timestamp.timestamp": time.Unix(0, 2_000_000).UTC(),
"row_count.int64": int64(99999),
"uncompressed_size.int64": int64(1_000_000_000),
labelColName: longLabel,
},
}
require.Equal(t, expected, actual)
}
// TestBuilder_ResetAndReuse verifies that Reset clears all rows and the builder can be reused.
func TestBuilder_ResetAndReuse(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
b.SetTenant("test-tenant")
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "first"}, MinTimestamp: 100})
b.Reset()
// After Reset, builder should be empty and produce no sections when flushed.
require.Empty(t, b.rows, "builder should be empty after reset")
require.Zero(t, b.EstimatedSize(), "estimated size should be zero after reset")
// Add new data after reset and flush.
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "second"}, MinTimestamp: 200})
obj, closer := buildObject(t, b)
t.Cleanup(func() { _ = closer.Close() })
actual := readAllRowsFromObject(t, obj)
expected := arrowtest.Rows{
{
"object_path.utf8": "",
"section_index.int64": int64(0),
"sort_schema.utf8": "service_name",
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
"row_count.int64": int64(0),
"uncompressed_size.int64": int64(0),
"service_name.label.utf8": "second",
},
}
require.Equal(t, expected, actual)
}
// TestBuilder_EstimatedSize verifies EstimatedSize returns non-zero after appending.
func TestBuilder_EstimatedSize(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
require.Zero(t, b.EstimatedSize(), "empty builder should have zero estimated size")
b.Append(Stat{
ObjectPath: "obj", // 3 bytes
SortSchema: "sch", // 3 bytes
Labels: map[string]string{"sch": "svc"}, // key: 3 bytes, value: 3 bytes
})
// 5 * 8 = 40 for int64s (SectionIndex, MinTimestamp, MaxTimestamp, RowCount, UncompressedSize)
// + 3 (ObjectPath) + 3 (SortSchema) + 3 (key) + 3 (value) = 52
require.Equal(t, 52, b.EstimatedSize())
}
// TestBuilder_FlushResetsBuilder verifies that a flush resets the builder state.
func TestBuilder_FlushResetsBuilder(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 100})
obj, closer := buildObject(t, b)
closer.Close()
require.Len(t, obj.Sections(), 1)
// After flush, builder should be empty (Reset was called).
require.Empty(t, b.rows, "builder should be empty after flush")
require.Zero(t, b.EstimatedSize(), "builder should have zero estimated size after flush")
}
// TestBuilder_Type verifies that the section type is correct.
func TestBuilder_Type(t *testing.T) {
b := NewBuilder(nil, defaultEncoder)
require.Equal(t, sectionType, b.Type())
}
// TestOpen_WrongSectionType verifies that Open rejects sections with the wrong type.
func TestOpen_WrongSectionType(t *testing.T) {
wrongType := &dataobj.Section{
Type: dataobj.SectionType{
Namespace: "github.com/grafana/loki",
Kind: "postings",
Version: 1,
},
}
_, err := Open(context.Background(), wrongType)
require.Error(t, err)
require.Contains(t, err.Error(), "section type mismatch")
}
// TestOpen_WrongVersion verifies that Open rejects sections with the wrong version.
func TestOpen_WrongVersion(t *testing.T) {
wrongVersion := &dataobj.Section{
Type: dataobj.SectionType{
Namespace: "github.com/grafana/loki",
Kind: "stats",
Version: 99,
},
}
_, err := Open(context.Background(), wrongVersion)
require.Error(t, err)
require.Contains(t, err.Error(), "unsupported section version")
}