Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/metastore/toc_replace_test.go

490 lines
15 KiB

package metastore
import (
"bytes"
"context"
stderrors "errors"
"io"
"sort"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
)
// tocRow is a flattened (tenant, path, start, end) view of a ToC for assertion convenience.
type tocRow struct {
Tenant string
Path string
StartUnix int64
EndUnix int64
}
// readToC reads all index pointers from a ToC at the given path, flattened by tenant.
func readToC(ctx context.Context, t *testing.T, bucket objstore.Bucket, path string) []tocRow {
t.Helper()
rc, err := bucket.Get(ctx, path)
require.NoError(t, err)
defer rc.Close()
raw, err := io.ReadAll(rc)
require.NoError(t, err)
obj, err := dataobj.FromReaderAt(bytes.NewReader(raw), int64(len(raw)))
require.NoError(t, err)
var rows []tocRow
var reader indexpointers.RowReader
defer reader.Close()
buf := make([]indexpointers.IndexPointer, 64)
for _, section := range obj.Sections().Filter(indexpointers.CheckSection) {
sec, err := indexpointers.Open(ctx, section)
require.NoError(t, err)
reader.Reset(sec)
require.NoError(t, reader.Open(ctx))
for {
n, err := reader.Read(ctx, buf)
for i := range n {
rows = append(rows, tocRow{
Tenant: section.Tenant,
Path: buf[i].Path,
StartUnix: buf[i].StartTs.UTC().Unix(),
EndUnix: buf[i].EndTs.UTC().Unix(),
})
}
if err == io.EOF {
break
}
require.NoError(t, err)
if n == 0 {
break
}
}
}
sort.Slice(rows, func(i, j int) bool {
if rows[i].Tenant != rows[j].Tenant {
return rows[i].Tenant < rows[j].Tenant
}
return rows[i].Path < rows[j].Path
})
return rows
}
// seedToC writes a ToC at the given window containing the supplied (tenant,path,start,end) rows.
// Uses the same indexobj.Builder + tocBuilderCfg path that the production writer uses.
func seedToC(t *testing.T, bucket objstore.Bucket, window time.Time, rows []tocRow) {
t.Helper()
b, err := indexobj.NewBuilder(tocBuilderCfg, nil)
require.NoError(t, err)
for _, r := range rows {
require.NoError(t, b.AppendIndexPointer(
r.Tenant, r.Path,
time.Unix(r.StartUnix, 0).UTC(),
time.Unix(r.EndUnix, 0).UTC(),
))
}
obj, closer, err := b.Flush()
require.NoError(t, err)
t.Cleanup(func() { _ = closer.Close() })
reader, err := obj.Reader(t.Context())
require.NoError(t, err)
defer reader.Close()
require.NoError(t, bucket.Upload(t.Context(), TableOfContentsPath(window), reader))
}
func TestReplaceIndexPointers_RoundTrip(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
bucket := objstore.NewInMemBucket()
seedToC(t, bucket, window, []tocRow{
{"tenantA", "idx/a-0", 10, 20},
{"tenantA", "idx/a-1", 30, 40},
{"tenantB", "idx/b-0", 11, 21},
{"tenantB", "idx/b-1", 31, 41},
})
writer := &TableOfContentsWriter{
bucket: bucket,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
swapped, err := writer.ReplaceIndexPointers(ctx, window, "tenantA",
[]string{"idx/a-0", "idx/a-1"},
[]TableOfContentsEntry{
{Path: "idx/a-new", StartTime: unixTime(100), EndTime: unixTime(110)},
},
)
require.NoError(t, err)
require.True(t, swapped, "expected swap to apply")
got := readToC(ctx, t, bucket, TableOfContentsPath(window))
want := []tocRow{
{"tenantA", "idx/a-new", 100, 110},
{"tenantB", "idx/b-0", 11, 21},
{"tenantB", "idx/b-1", 31, 41},
}
require.Equal(t, want, got)
}
func TestReplaceIndexPointers_MultiTenantPreservation(t *testing.T) {
tests := []struct {
name string
seedRows []tocRow
targetTenant string
oldPaths []string
newEntries []TableOfContentsEntry
wantTargetRows []tocRow
otherTenants []string
}{
{
// Disjoint per-tenant index paths: each tenant owns its own set
// of idx/... paths. This is the L1 → L1 re-compaction shape.
name: "disjoint indexes per tenant",
seedRows: []tocRow{
{"tenantA", "idx/a-0", 10, 20},
{"tenantA", "idx/a-1", 30, 40},
{"tenantA", "idx/a-2", 50, 60},
{"tenantB", "idx/b-0", 11, 21},
{"tenantB", "idx/b-1", 31, 41},
{"tenantB", "idx/b-2", 51, 61},
{"tenantC", "idx/c-0", 12, 22},
{"tenantC", "idx/c-1", 32, 42},
{"tenantC", "idx/c-2", 52, 62},
},
targetTenant: "tenantA",
oldPaths: []string{"idx/a-0", "idx/a-1", "idx/a-2"},
newEntries: []TableOfContentsEntry{
{Path: "idx/a-merged", StartTime: unixTime(10), EndTime: unixTime(60)},
},
wantTargetRows: []tocRow{{"tenantA", "idx/a-merged", 10, 60}},
otherTenants: []string{"tenantB", "tenantC"},
},
{
// L0 → L1 compaction shape: L0 indexes are multi-tenant — the
// same idx/... path is referenced from multiple tenants' sections.
// Compacting tenantA must drop those paths from tenantA's section
// ONLY; tenantB's references to the same shared L0 paths must
// remain. This exercises the `sectionTenant == tenant` guard.
name: "shared L0 indexes across tenants",
seedRows: []tocRow{
{"tenantA", "idx/l0-shared-0", 10, 20},
{"tenantB", "idx/l0-shared-0", 10, 20},
{"tenantA", "idx/l0-shared-1", 30, 40},
{"tenantB", "idx/l0-shared-1", 30, 40},
{"tenantC", "idx/c-0", 12, 22},
},
targetTenant: "tenantA",
oldPaths: []string{"idx/l0-shared-0", "idx/l0-shared-1"},
newEntries: []TableOfContentsEntry{
{Path: "idx/a-l1", StartTime: unixTime(10), EndTime: unixTime(40)},
},
wantTargetRows: []tocRow{{"tenantA", "idx/a-l1", 10, 40}},
otherTenants: []string{"tenantB", "tenantC"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
bucket := objstore.NewInMemBucket()
seedToC(t, bucket, window, tt.seedRows)
// Capture other-tenant rows pre-swap so we can compare verbatim.
preSwap := readToC(ctx, t, bucket, TableOfContentsPath(window))
otherRowsBefore := filterRows(preSwap, tt.otherTenants...)
writer := &TableOfContentsWriter{
bucket: bucket,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
swapped, err := writer.ReplaceIndexPointers(ctx, window,
tt.targetTenant, tt.oldPaths, tt.newEntries,
)
require.NoError(t, err)
require.True(t, swapped, "expected %s swap to apply", tt.targetTenant)
postSwap := readToC(ctx, t, bucket, TableOfContentsPath(window))
// 1. Target tenant ends up with exactly the expected rows.
targetAfter := filterRows(postSwap, tt.targetTenant)
require.Equal(t, tt.wantTargetRows, targetAfter)
// 2. Other tenants' rows are unchanged, including any that share
// paths with oldPaths in their own sections.
otherRowsAfter := filterRows(postSwap, tt.otherTenants...)
require.Equal(t, otherRowsBefore, otherRowsAfter,
"non-target tenant rows must be preserved unchanged")
})
}
}
func filterRows(rows []tocRow, tenants ...string) []tocRow {
keep := make(map[string]struct{}, len(tenants))
for _, t := range tenants {
keep[t] = struct{}{}
}
out := make([]tocRow, 0, len(rows))
for _, r := range rows {
if _, ok := keep[r.Tenant]; ok {
out = append(out, r)
}
}
return out
}
func TestReplaceIndexPointers_RaceLossOldPathsAlreadyGone(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
bucket := objstore.NewInMemBucket()
seedToC(t, bucket, window, []tocRow{
{"tenantA", "idx/a-already-rolled-up", 10, 60}, // simulates "the other coordinator's swap already landed"
{"tenantB", "idx/b-0", 11, 21},
})
preSwap := readToC(ctx, t, bucket, TableOfContentsPath(window))
writer := &TableOfContentsWriter{
bucket: bucket,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
// Caller still believes "idx/a-0" / "idx/a-1" are present — they're not.
swapped, err := writer.ReplaceIndexPointers(ctx, window, "tenantA",
[]string{"idx/a-0", "idx/a-1"},
[]TableOfContentsEntry{
{Path: "idx/a-new", StartTime: unixTime(10), EndTime: unixTime(60)},
},
)
require.NoError(t, err)
require.False(t, swapped, "expected no-op when oldPaths are no longer present")
postSwap := readToC(ctx, t, bucket, TableOfContentsPath(window))
require.Equal(t, preSwap, postSwap, "ToC must be unchanged on race-loss")
}
func TestReplaceIndexPointers_MissingToC(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
bucket := objstore.NewInMemBucket()
tocPath := TableOfContentsPath(window)
writer := &TableOfContentsWriter{
bucket: bucket,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
swapped, err := writer.ReplaceIndexPointers(ctx, window, "tenantA",
[]string{"idx/a-0"},
[]TableOfContentsEntry{
{Path: "idx/a-new", StartTime: unixTime(10), EndTime: unixTime(20)},
},
)
require.NoError(t, err)
require.False(t, swapped, "missing ToC must no-op")
// Verify the no-op did NOT materialize an empty object at tocPath.
exists, err := bucket.Exists(ctx, tocPath)
require.NoError(t, err)
require.False(t, exists, "missing-ToC no-op must not create an empty ToC blob")
}
// flakyBucket wraps an objstore.Bucket and, on the first N GetAndReplace calls,
// returns the supplied error WITHOUT invoking the callback. Subsequent calls
// pass through. Used to simulate a 412 PreconditionFailed on the first attempt.
type flakyBucket struct {
objstore.Bucket
mu sync.Mutex
remainingErrors []error
}
func (b *flakyBucket) GetAndReplace(ctx context.Context, name string, fn func(io.ReadCloser) (io.ReadCloser, error)) error {
b.mu.Lock()
if len(b.remainingErrors) > 0 {
err := b.remainingErrors[0]
b.remainingErrors = b.remainingErrors[1:]
b.mu.Unlock()
return err
}
b.mu.Unlock()
return b.Bucket.GetAndReplace(ctx, name, fn)
}
// alwaysFailBucket wraps an objstore.Bucket and returns errPreconditionFailed
// from every GetAndReplace call. Used to drive the retry-exhaustion test.
type alwaysFailBucket struct {
objstore.Bucket
}
func (b *alwaysFailBucket) GetAndReplace(_ context.Context, _ string, _ func(io.ReadCloser) (io.ReadCloser, error)) error {
return errPreconditionFailed
}
// errPreconditionFailed is a synthetic 412-shaped error used by the retry tests.
var errPreconditionFailed = stderrors.New("PreconditionFailed: simulated If-Match mismatch")
func TestReplaceIndexPointers_RetriesOnConditionalWriteFailure(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
inner := objstore.NewInMemBucket()
seedToC(t, inner, window, []tocRow{
{"tenantA", "idx/a-0", 10, 20},
{"tenantB", "idx/b-0", 11, 21},
})
flaky := &flakyBucket{
Bucket: inner,
remainingErrors: []error{errPreconditionFailed},
}
writer := &TableOfContentsWriter{
bucket: flaky,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
swapped, err := writer.ReplaceIndexPointers(ctx, window, "tenantA",
[]string{"idx/a-0"},
[]TableOfContentsEntry{
{Path: "idx/a-new", StartTime: unixTime(100), EndTime: unixTime(110)},
},
)
require.NoError(t, err)
require.True(t, swapped)
got := readToC(ctx, t, inner, TableOfContentsPath(window))
require.Equal(t, []tocRow{
{"tenantA", "idx/a-new", 100, 110},
{"tenantB", "idx/b-0", 11, 21},
}, got)
}
func TestReplaceIndexPointers_RetryExhaustion(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
inner := objstore.NewInMemBucket()
seedToC(t, inner, window, []tocRow{{"tenantA", "idx/a-0", 10, 20}})
// Always fail. Build a wrapper that returns errPreconditionFailed every call.
alwaysFail := &alwaysFailBucket{Bucket: inner}
writer := &TableOfContentsWriter{
bucket: alwaysFail,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
// Use the same-package internal helper to override backoff to a tight budget,
// keeping this test fast (<100ms) while still exercising the retry loop.
tightBackoff := backoff.Config{
MinBackoff: 1 * time.Millisecond,
MaxBackoff: 5 * time.Millisecond,
MaxRetries: 3,
}
swapped, err := writer.replaceIndexPointers(ctx, window, "tenantA",
[]string{"idx/a-0"}, nil, tightBackoff,
)
require.Error(t, err)
require.ErrorIs(t, err, errPreconditionFailed)
require.False(t, swapped)
}
// countingFailBucket wraps an objstore.Bucket and tracks the number of
// GetAndReplace calls. Used to prove the empty-oldPaths fast-path bypasses
// storage entirely.
type countingFailBucket struct {
objstore.Bucket
mu sync.Mutex
calls int
}
func (b *countingFailBucket) GetAndReplace(_ context.Context, _ string, _ func(io.ReadCloser) (io.ReadCloser, error)) error {
b.mu.Lock()
b.calls++
b.mu.Unlock()
return errPreconditionFailed
}
func TestReplaceIndexPointers_EmptyOldAndNewPaths_BypassesStorage(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
bucket := &countingFailBucket{Bucket: objstore.NewInMemBucket()}
writer := &TableOfContentsWriter{
bucket: bucket,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
// Even with a permanently-failing bucket, empty old and new paths must no-op
// without touching storage. This is the deterministic-no-op contract.
swapped, err := writer.ReplaceIndexPointers(ctx, window, "tenantA",
nil,
nil,
)
require.NoError(t, err)
require.False(t, swapped)
require.Equal(t, 0, bucket.calls, "empty oldPaths and newEntries must bypass GetAndReplace entirely")
// Same property for empty slice (not nil).
swapped, err = writer.ReplaceIndexPointers(ctx, window, "tenantA",
[]string{},
[]TableOfContentsEntry{},
)
require.NoError(t, err)
require.False(t, swapped)
require.Equal(t, 0, bucket.calls, "empty oldPaths and newEntries must bypass GetAndReplace entirely")
}
func TestReplaceIndexPointers_EmptyOldOrNewPaths_Errors(t *testing.T) {
ctx := context.Background()
window := unixTime(0)
bucket := &countingFailBucket{Bucket: objstore.NewInMemBucket()}
writer := &TableOfContentsWriter{
bucket: bucket,
metrics: newTableOfContentsMetrics(),
logger: log.NewNopLogger(),
builderOnce: sync.Once{},
}
// Empty old, non empty new => error without calling storage.
swapped, err := writer.ReplaceIndexPointers(ctx, window, "tenantA",
nil,
[]TableOfContentsEntry{
{Path: "idx/a-new", StartTime: unixTime(100), EndTime: unixTime(110)},
},
)
require.Error(t, err)
require.False(t, swapped)
require.Equal(t, 0, bucket.calls, "must bypass GetAndReplace entirely")
// Non-empty old, empty new => error without calling storage.
swapped, err = writer.ReplaceIndexPointers(ctx, window, "tenantA",
[]string{"idx/a-old"},
nil,
)
require.Error(t, err)
require.False(t, swapped)
require.Equal(t, 0, bucket.calls, "must bypass GetAndReplace entirely")
}