chore: Fix tenant info in sections when resetting (#19085)

Co-authored-by: George Robinson <george.robinson@grafana.com>
pull/14419/merge
benclive 4 months ago committed by GitHub
parent a03e86c858
commit 653d7ac19e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      pkg/dataobj/consumer/logsobj/builder.go
  2. 23
      pkg/dataobj/consumer/logsobj/builder_test.go
  3. 17
      pkg/logql/bench/bench_test.go

@ -235,6 +235,8 @@ func (b *Builder) Append(tenant string, stream logproto.Stream) error {
if err := b.builder.Append(lb); err != nil {
return err
}
// We need to set the tenant again after flushing because the builder is reset.
lb.SetTenant(tenant)
}
}

@ -18,8 +18,8 @@ import (
var testBuilderConfig = BuilderConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 22, // 4 MiB
TargetSectionSize: 1 << 21, // 2 MiB
TargetObjectSize: 1 << 20, // 1 MiB
TargetSectionSize: 1 << 19, // 512 KiB
BufferSize: 2048 * 8,
@ -95,10 +95,12 @@ func TestBuilder_Append(t *testing.T) {
builder, err := NewBuilder(testBuilderConfig, nil)
require.NoError(t, err)
tenant := "test"
for {
require.NoError(t, ctx.Err())
err := builder.Append("test", logproto.Stream{
err := builder.Append(tenant, logproto.Stream{
Labels: `{cluster="test",app="foo"}`,
Entries: []push.Entry{{
Timestamp: time.Now().UTC(),
@ -110,4 +112,19 @@ func TestBuilder_Append(t *testing.T) {
}
require.NoError(t, err)
}
obj, closer, err := builder.Flush()
require.NoError(t, err)
defer closer.Close()
// When a section builder is reset, which happens on ErrBuilderFull, the
// tenant is reset too. We must check that the tenant is added back
// to the section builder otherwise tenant will be absent from successive
// sections.
secs := obj.Sections()
require.Equal(t, 1, secs.Count(streams.CheckSection))
require.Greater(t, secs.Count(logs.CheckSection), 1)
for _, section := range secs.Filter(logs.CheckSection) {
require.Equal(t, tenant, section.Tenant)
}
}

@ -386,3 +386,20 @@ func TestPrintBenchmarkQueries(t *testing.T) {
t.Logf("- Metric queries: %d (forward only)", metricQueries)
t.Logf("- Total benchmark cases: %d", len(cases))
}
func TestStoresGenerateData(t *testing.T) {
dir := t.TempDir()
chunkStore, err := NewChunkStore(dir, testTenant)
if err != nil {
t.Fatal(err)
}
dataObjStore, err := NewDataObjStore(dir, testTenant)
if err != nil {
t.Fatal(err)
}
builder := NewBuilder(dir, DefaultOpt(), chunkStore, dataObjStore)
err = builder.Generate(context.Background(), 1024)
require.NoError(t, err)
}

Loading…
Cancel
Save