From 653d7ac19ef157f39ee12dfc5fca22856e460b99 Mon Sep 17 00:00:00 2001 From: benclive Date: Mon, 1 Sep 2025 13:01:19 +0100 Subject: [PATCH] chore: Fix tenant info in sections when resetting (#19085) Co-authored-by: George Robinson --- pkg/dataobj/consumer/logsobj/builder.go | 2 ++ pkg/dataobj/consumer/logsobj/builder_test.go | 23 +++++++++++++++++--- pkg/logql/bench/bench_test.go | 17 +++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/pkg/dataobj/consumer/logsobj/builder.go b/pkg/dataobj/consumer/logsobj/builder.go index 6b5f652e0f..1144143631 100644 --- a/pkg/dataobj/consumer/logsobj/builder.go +++ b/pkg/dataobj/consumer/logsobj/builder.go @@ -235,6 +235,8 @@ func (b *Builder) Append(tenant string, stream logproto.Stream) error { if err := b.builder.Append(lb); err != nil { return err } + // We need to set the tenant again after flushing because the builder is reset. + lb.SetTenant(tenant) } } diff --git a/pkg/dataobj/consumer/logsobj/builder_test.go b/pkg/dataobj/consumer/logsobj/builder_test.go index fefedb5b0d..87be44db37 100644 --- a/pkg/dataobj/consumer/logsobj/builder_test.go +++ b/pkg/dataobj/consumer/logsobj/builder_test.go @@ -18,8 +18,8 @@ import ( var testBuilderConfig = BuilderConfig{ TargetPageSize: 2048, - TargetObjectSize: 1 << 22, // 4 MiB - TargetSectionSize: 1 << 21, // 2 MiB + TargetObjectSize: 1 << 20, // 1 MiB + TargetSectionSize: 1 << 19, // 512 KiB BufferSize: 2048 * 8, @@ -95,10 +95,12 @@ func TestBuilder_Append(t *testing.T) { builder, err := NewBuilder(testBuilderConfig, nil) require.NoError(t, err) + tenant := "test" + for { require.NoError(t, ctx.Err()) - err := builder.Append("test", logproto.Stream{ + err := builder.Append(tenant, logproto.Stream{ Labels: `{cluster="test",app="foo"}`, Entries: []push.Entry{{ Timestamp: time.Now().UTC(), @@ -110,4 +112,19 @@ func TestBuilder_Append(t *testing.T) { } require.NoError(t, err) } + + obj, closer, err := builder.Flush() + require.NoError(t, err) + defer closer.Close() + + // When a section builder is reset, which happens on ErrBuilderFull, the + // tenant is reset too. We must check that the tenant is added back + // to the section builder otherwise tenant will be absent from successive + // sections. + secs := obj.Sections() + require.Equal(t, 1, secs.Count(streams.CheckSection)) + require.Greater(t, secs.Count(logs.CheckSection), 1) + for _, section := range secs.Filter(logs.CheckSection) { + require.Equal(t, tenant, section.Tenant) + } } diff --git a/pkg/logql/bench/bench_test.go b/pkg/logql/bench/bench_test.go index 366115f56d..07f4a174ad 100644 --- a/pkg/logql/bench/bench_test.go +++ b/pkg/logql/bench/bench_test.go @@ -386,3 +386,20 @@ func TestPrintBenchmarkQueries(t *testing.T) { t.Logf("- Metric queries: %d (forward only)", metricQueries) t.Logf("- Total benchmark cases: %d", len(cases)) } + +func TestStoresGenerateData(t *testing.T) { + dir := t.TempDir() + + chunkStore, err := NewChunkStore(dir, testTenant) + if err != nil { + t.Fatal(err) + } + dataObjStore, err := NewDataObjStore(dir, testTenant) + if err != nil { + t.Fatal(err) + } + + builder := NewBuilder(dir, DefaultOpt(), chunkStore, dataObjStore) + err = builder.Generate(context.Background(), 1024) + require.NoError(t, err) +}