mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
527 lines
17 KiB
527 lines
17 KiB
package stats
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/util/arrowtest"
|
|
)
|
|
|
|
// defaultEncoder is a SectionEncoder using default page sizes for testing.
|
|
var defaultEncoder = ColumnarSectionEncoder(1024*1024, 10000)
|
|
|
|
// buildObject flushes the builder into a dataobj.Object using dataobj.Builder.
|
|
func buildObject(t *testing.T, b *Builder) (*dataobj.Object, io.Closer) {
|
|
t.Helper()
|
|
b.SetTenant("test-tenant")
|
|
objBuilder := dataobj.NewBuilder(nil)
|
|
err := objBuilder.Append(b)
|
|
require.NoError(t, err)
|
|
obj, closer, err := objBuilder.Flush()
|
|
require.NoError(t, err)
|
|
return obj, closer
|
|
}
|
|
|
|
// readTable drains a Reader into an arrow.Table, mirroring streams/reader_test.go.
|
|
func readTable(ctx context.Context, r *Reader) (arrow.Table, error) {
|
|
var recs []arrow.RecordBatch
|
|
for {
|
|
rec, err := r.Read(ctx, 128)
|
|
if rec != nil && rec.NumRows() > 0 {
|
|
recs = append(recs, rec)
|
|
}
|
|
if err != nil && errors.Is(err, io.EOF) {
|
|
break
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if len(recs) == 0 {
|
|
return nil, io.EOF
|
|
}
|
|
return array.NewTableFromRecords(recs[0].Schema(), recs), nil
|
|
}
|
|
|
|
// readAllRowsFromObject opens every stats section in obj and concatenates
|
|
// all rows in obj.Sections() order (stable).
|
|
func readAllRowsFromObject(t *testing.T, obj *dataobj.Object) arrowtest.Rows {
|
|
t.Helper()
|
|
var all arrowtest.Rows
|
|
for _, s := range obj.Sections() {
|
|
if !CheckSection(s) {
|
|
continue
|
|
}
|
|
sec, err := Open(context.Background(), s)
|
|
require.NoError(t, err)
|
|
|
|
r := NewReader(ReaderOptions{
|
|
Columns: sec.Columns(),
|
|
Allocator: memory.DefaultAllocator,
|
|
})
|
|
require.NoError(t, r.Open(context.Background()))
|
|
t.Cleanup(func() { _ = r.Close() })
|
|
|
|
tbl, err := readTable(context.Background(), r)
|
|
if errors.Is(err, io.EOF) {
|
|
continue
|
|
}
|
|
require.NoError(t, err)
|
|
|
|
rows, err := arrowtest.TableRows(memory.DefaultAllocator, tbl)
|
|
require.NoError(t, err)
|
|
all = append(all, rows...)
|
|
}
|
|
return all
|
|
}
|
|
|
|
// TestBuilder_Empty verifies that an empty builder produces no sections.
|
|
func TestBuilder_Empty(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
require.Zero(t, b.EstimatedSize(), "empty builder should have zero size")
|
|
require.Empty(t, b.rows, "empty builder should have no rows")
|
|
}
|
|
|
|
// TestBuilder_RoundTrip verifies stats round-trip correctly through on-disk encoding.
|
|
func TestBuilder_RoundTrip(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
|
|
input := []Stat{
|
|
{
|
|
ObjectPath: "/tenant/abc/obj1",
|
|
SectionIndex: 0,
|
|
SortSchema: "service_name",
|
|
Labels: map[string]string{"service_name": "foo"},
|
|
MinTimestamp: 1000,
|
|
MaxTimestamp: 2000,
|
|
RowCount: 100,
|
|
UncompressedSize: 8192,
|
|
},
|
|
{
|
|
ObjectPath: "/tenant/abc/obj2",
|
|
SectionIndex: 1,
|
|
SortSchema: "service_name",
|
|
Labels: map[string]string{"service_name": "bar"},
|
|
MinTimestamp: 500,
|
|
MaxTimestamp: 1500,
|
|
RowCount: 50,
|
|
UncompressedSize: 4096,
|
|
},
|
|
{
|
|
ObjectPath: "/tenant/abc/obj3",
|
|
SectionIndex: 2,
|
|
SortSchema: "service_name",
|
|
Labels: map[string]string{"service_name": "baz"},
|
|
MinTimestamp: 3000,
|
|
MaxTimestamp: 4000,
|
|
RowCount: 200,
|
|
UncompressedSize: 16384,
|
|
},
|
|
}
|
|
|
|
for _, s := range input {
|
|
b.Append(s)
|
|
}
|
|
|
|
obj, closer := buildObject(t, b)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
actual := readAllRowsFromObject(t, obj)
|
|
// Sort order: bar < baz < foo
|
|
expected := arrowtest.Rows{
|
|
{
|
|
"object_path.utf8": "/tenant/abc/obj2",
|
|
"section_index.int64": int64(1),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 500).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 1500).UTC(),
|
|
"row_count.int64": int64(50),
|
|
"uncompressed_size.int64": int64(4096),
|
|
"service_name.label.utf8": "bar",
|
|
},
|
|
{
|
|
"object_path.utf8": "/tenant/abc/obj3",
|
|
"section_index.int64": int64(2),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 3000).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 4000).UTC(),
|
|
"row_count.int64": int64(200),
|
|
"uncompressed_size.int64": int64(16384),
|
|
"service_name.label.utf8": "baz",
|
|
},
|
|
{
|
|
"object_path.utf8": "/tenant/abc/obj1",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 1000).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 2000).UTC(),
|
|
"row_count.int64": int64(100),
|
|
"uncompressed_size.int64": int64(8192),
|
|
"service_name.label.utf8": "foo",
|
|
},
|
|
}
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestBuilder_SortOrder verifies the sort order: label values in sort-schema order,
|
|
// then MinTimestamp, then MaxTimestamp.
|
|
func TestBuilder_SortOrder(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
|
|
// Intentionally appended out of order.
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "beta"}, MinTimestamp: 200})
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "alpha"}, MinTimestamp: 300})
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "alpha"}, MinTimestamp: 100})
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "gamma"}, MinTimestamp: 50})
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "alpha"}, MinTimestamp: 200})
|
|
|
|
obj, closer := buildObject(t, b)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
actual := readAllRowsFromObject(t, obj)
|
|
// Verify sort order: alpha(100), alpha(200), alpha(300), beta(200), gamma(50).
|
|
expected := arrowtest.Rows{
|
|
{
|
|
"object_path.utf8": "",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 100).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "alpha",
|
|
},
|
|
{
|
|
"object_path.utf8": "",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "alpha",
|
|
},
|
|
{
|
|
"object_path.utf8": "",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 300).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "alpha",
|
|
},
|
|
{
|
|
"object_path.utf8": "",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "beta",
|
|
},
|
|
{
|
|
"object_path.utf8": "",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 50).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "gamma",
|
|
},
|
|
}
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestBuilder_AllSameServiceName verifies that rows with identical service_name
|
|
// are sorted by MinTimestamp.
|
|
func TestBuilder_AllSameServiceName(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
|
|
// Multiple rows with the same service_name, different timestamps.
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 300, ObjectPath: "c"})
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 100, ObjectPath: "a"})
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 200, ObjectPath: "b"})
|
|
|
|
obj, closer := buildObject(t, b)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
actual := readAllRowsFromObject(t, obj)
|
|
// Sort is by MinTimestamp within the same service_name.
|
|
expected := arrowtest.Rows{
|
|
{
|
|
"object_path.utf8": "a",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 100).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "svc",
|
|
},
|
|
{
|
|
"object_path.utf8": "b",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "svc",
|
|
},
|
|
{
|
|
"object_path.utf8": "c",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 300).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "svc",
|
|
},
|
|
}
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestBuilder_MissingServiceName verifies rows with empty/missing label values sort before non-empty ones.
|
|
func TestBuilder_MissingServiceName(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": ""}, ObjectPath: "obj1", MinTimestamp: 100})
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, ObjectPath: "obj2", MinTimestamp: 200})
|
|
|
|
obj, closer := buildObject(t, b)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
actual := readAllRowsFromObject(t, obj)
|
|
// Empty string sorts before "svc".
|
|
expected := arrowtest.Rows{
|
|
{
|
|
"object_path.utf8": "obj1",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 100).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "",
|
|
},
|
|
{
|
|
"object_path.utf8": "obj2",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "svc",
|
|
},
|
|
}
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestBuilder_SectionSplitting verifies the mid-accumulation flush pattern using dataobj.Builder:
|
|
// append rows, flush via dataobjBuilder.Append, append more, verify 2 sections.
|
|
func TestBuilder_SectionSplitting(t *testing.T) {
|
|
// Use a very small page size to force multiple pages.
|
|
smallEncoder := ColumnarSectionEncoder(100, 2)
|
|
b := NewBuilder(nil, smallEncoder)
|
|
b.SetTenant("test-tenant")
|
|
|
|
// Append first batch.
|
|
for i := range 3 {
|
|
b.Append(Stat{
|
|
ObjectPath: "x",
|
|
SortSchema: "service_name",
|
|
Labels: map[string]string{"service_name": "svc"},
|
|
MinTimestamp: int64(i * 100),
|
|
})
|
|
}
|
|
|
|
objBuilder := dataobj.NewBuilder(nil)
|
|
|
|
// Flush first batch into objBuilder.
|
|
require.NoError(t, objBuilder.Append(b))
|
|
|
|
// Append second batch.
|
|
for i := range 3 {
|
|
b.Append(Stat{
|
|
ObjectPath: "y",
|
|
SortSchema: "service_name",
|
|
Labels: map[string]string{"service_name": "svc"},
|
|
MinTimestamp: int64((i + 3) * 100),
|
|
})
|
|
}
|
|
|
|
// Flush second batch into objBuilder.
|
|
require.NoError(t, objBuilder.Append(b))
|
|
|
|
obj, closer, err := objBuilder.Flush()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
// Verify 2 sections were created.
|
|
var statsSections []*dataobj.Section
|
|
for _, s := range obj.Sections() {
|
|
if CheckSection(s) {
|
|
statsSections = append(statsSections, s)
|
|
}
|
|
}
|
|
require.Len(t, statsSections, 2, "expected 2 stats sections after two flushes")
|
|
|
|
// Collect all rows from both sections.
|
|
actual := readAllRowsFromObject(t, obj)
|
|
require.Len(t, actual, 6)
|
|
}
|
|
|
|
// TestBuilder_LargeValues verifies large label and path values round-trip correctly.
|
|
func TestBuilder_LargeValues(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
|
|
longPath := "/" + strings.Repeat("a", 10000)
|
|
longLabel := strings.Repeat("b", 5000)
|
|
longSchema := strings.Repeat("c", 2000)
|
|
|
|
b.Append(Stat{
|
|
ObjectPath: longPath,
|
|
SortSchema: longSchema,
|
|
Labels: map[string]string{longSchema: longLabel},
|
|
SectionIndex: 99,
|
|
MinTimestamp: 1_000_000,
|
|
MaxTimestamp: 2_000_000,
|
|
RowCount: 99999,
|
|
UncompressedSize: 1_000_000_000,
|
|
})
|
|
|
|
obj, closer := buildObject(t, b)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
actual := readAllRowsFromObject(t, obj)
|
|
require.Len(t, actual, 1)
|
|
|
|
// Build the expected label column name dynamically
|
|
labelColName := longSchema + ".label.utf8"
|
|
expected := arrowtest.Rows{
|
|
{
|
|
"object_path.utf8": longPath,
|
|
"section_index.int64": int64(99),
|
|
"sort_schema.utf8": longSchema,
|
|
"min_timestamp.timestamp": time.Unix(0, 1_000_000).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 2_000_000).UTC(),
|
|
"row_count.int64": int64(99999),
|
|
"uncompressed_size.int64": int64(1_000_000_000),
|
|
labelColName: longLabel,
|
|
},
|
|
}
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestBuilder_ResetAndReuse verifies that Reset clears all rows and the builder can be reused.
|
|
func TestBuilder_ResetAndReuse(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
b.SetTenant("test-tenant")
|
|
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "first"}, MinTimestamp: 100})
|
|
b.Reset()
|
|
|
|
// After Reset, builder should be empty and produce no sections when flushed.
|
|
require.Empty(t, b.rows, "builder should be empty after reset")
|
|
require.Zero(t, b.EstimatedSize(), "estimated size should be zero after reset")
|
|
|
|
// Add new data after reset and flush.
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "second"}, MinTimestamp: 200})
|
|
|
|
obj, closer := buildObject(t, b)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
actual := readAllRowsFromObject(t, obj)
|
|
expected := arrowtest.Rows{
|
|
{
|
|
"object_path.utf8": "",
|
|
"section_index.int64": int64(0),
|
|
"sort_schema.utf8": "service_name",
|
|
"min_timestamp.timestamp": time.Unix(0, 200).UTC(),
|
|
"max_timestamp.timestamp": time.Unix(0, 0).UTC(),
|
|
"row_count.int64": int64(0),
|
|
"uncompressed_size.int64": int64(0),
|
|
"service_name.label.utf8": "second",
|
|
},
|
|
}
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestBuilder_EstimatedSize verifies EstimatedSize returns non-zero after appending.
|
|
func TestBuilder_EstimatedSize(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
|
|
require.Zero(t, b.EstimatedSize(), "empty builder should have zero estimated size")
|
|
|
|
b.Append(Stat{
|
|
ObjectPath: "obj", // 3 bytes
|
|
SortSchema: "sch", // 3 bytes
|
|
Labels: map[string]string{"sch": "svc"}, // key: 3 bytes, value: 3 bytes
|
|
})
|
|
|
|
// 5 * 8 = 40 for int64s (SectionIndex, MinTimestamp, MaxTimestamp, RowCount, UncompressedSize)
|
|
// + 3 (ObjectPath) + 3 (SortSchema) + 3 (key) + 3 (value) = 52
|
|
require.Equal(t, 52, b.EstimatedSize())
|
|
}
|
|
|
|
// TestBuilder_FlushResetsBuilder verifies that a flush resets the builder state.
|
|
func TestBuilder_FlushResetsBuilder(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
b.Append(Stat{SortSchema: "service_name", Labels: map[string]string{"service_name": "svc"}, MinTimestamp: 100})
|
|
|
|
obj, closer := buildObject(t, b)
|
|
closer.Close()
|
|
require.Len(t, obj.Sections(), 1)
|
|
|
|
// After flush, builder should be empty (Reset was called).
|
|
require.Empty(t, b.rows, "builder should be empty after flush")
|
|
require.Zero(t, b.EstimatedSize(), "builder should have zero estimated size after flush")
|
|
}
|
|
|
|
// TestBuilder_Type verifies that the section type is correct.
|
|
func TestBuilder_Type(t *testing.T) {
|
|
b := NewBuilder(nil, defaultEncoder)
|
|
require.Equal(t, sectionType, b.Type())
|
|
}
|
|
|
|
// TestOpen_WrongSectionType verifies that Open rejects sections with the wrong type.
|
|
func TestOpen_WrongSectionType(t *testing.T) {
|
|
wrongType := &dataobj.Section{
|
|
Type: dataobj.SectionType{
|
|
Namespace: "github.com/grafana/loki",
|
|
Kind: "postings",
|
|
Version: 1,
|
|
},
|
|
}
|
|
_, err := Open(context.Background(), wrongType)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "section type mismatch")
|
|
}
|
|
|
|
// TestOpen_WrongVersion verifies that Open rejects sections with the wrong version.
|
|
func TestOpen_WrongVersion(t *testing.T) {
|
|
wrongVersion := &dataobj.Section{
|
|
Type: dataobj.SectionType{
|
|
Namespace: "github.com/grafana/loki",
|
|
Kind: "stats",
|
|
Version: 99,
|
|
},
|
|
}
|
|
_, err := Open(context.Background(), wrongVersion)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "unsupported section version")
|
|
}
|
|
|