Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/metastore/toc_replace.go

277 lines
8.8 KiB

package metastore
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
"github.com/grafana/dskit/backoff"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
)
// TableOfContentsEntry describes an index-pointer row to add to a ToC for a
// given tenant. Used by ReplaceIndexPointers as the "to add" set.
type TableOfContentsEntry struct {
// Path is the object-storage path of the index object.
Path string
// StartTime / EndTime bound the time range covered by the index.
StartTime time.Time
EndTime time.Time
}
// replaceBackoffConfig bounds ReplaceIndexPointers retries on conditional-write
// rejection (412 PreconditionFailed) or transient bucket errors.
var replaceBackoffConfig = backoff.Config{
MinBackoff: 50 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 10,
}
// errReplaceNoOp is a sentinel returned from the GetAndReplace callback to
// signal "do not write". It is consumed by replaceIndexPointers and converted
// to (false, nil); it never escapes the package.
var errReplaceNoOp = errors.New("replace-index-pointers: no-op")
// ReplaceIndexPointers atomically swaps a set of index pointers in the
// ToC for the given window. For the target tenant, every row in oldPaths
// is removed and every entry in newEntries is added; all other tenants'
// entries are preserved unchanged.
//
// Returns (true, nil) if the swap was applied.
// Returns (false, nil) if there's nothing to do, examples are:
// - both oldPaths and newEntries are empty
// - oldPaths do not exist in TOC (race-loss)
// - TOC doesn't exist
// Returns (false, error) if an error happened (including retry exhaustion).
//
// Race-loss is detected on an ANY-match basis: if ANY oldPath is still
// present in the target tenant's current section, the swap proceeds and
// drops the matched subset (leaving non-matched oldPaths' replacements, if
// they were already swapped in by a concurrent coordinator, untouched).
// Only when ZERO oldPaths match is the call treated as a no-op. Callers
// orchestrating per-cycle plans against a single ToC snapshot will see
// all-or-nothing matches in practice; partial overlaps can only occur in
// rare cross-cycle races and produce bounded duplicate index entries that
// the next cycle's plan will re-merge.
//
// The primitive is idempotent: re-invoking it with already-applied
// oldPaths/newEntries is a no-op.
//
// Callers must serialize overlapping ReplaceIndexPointers calls for the
// same window within a process; the method allocates per-call state but
// does not coordinate across goroutines. Concurrent processes racing on
// the same window are safe because each call goes through a fresh
// GetAndReplace with conditional-PUT semantics.
func (m *TableOfContentsWriter) ReplaceIndexPointers(
ctx context.Context,
window time.Time,
tenant string,
oldPaths []string,
newEntries []TableOfContentsEntry,
) (bool, error) {
oldEmpty := len(oldPaths) == 0
newEmpty := len(newEntries) == 0
switch {
case oldEmpty && newEmpty:
return false, nil
case oldEmpty && !newEmpty:
return false, errors.New("replace-index-pointers: no new entries")
case !oldEmpty && newEmpty:
return false, errors.New("replace-index-pointers: no old entries")
default:
return m.replaceIndexPointers(ctx, window, tenant, oldPaths, newEntries, replaceBackoffConfig)
}
}
// replaceIndexPointers is the internal entrypoint that accepts an explicit
// backoff config, exposed only to same-package tests for retry-exhaustion
// coverage. Production callers go through ReplaceIndexPointers.
func (m *TableOfContentsWriter) replaceIndexPointers(
ctx context.Context,
window time.Time,
tenant string,
oldPaths []string,
newEntries []TableOfContentsEntry,
backoffCfg backoff.Config,
) (bool, error) {
tocPath := TableOfContentsPath(window.Truncate(MetastoreWindowSize).UTC())
oldSet := make(map[string]struct{}, len(oldPaths))
for _, p := range oldPaths {
oldSet[p] = struct{}{}
}
var (
swapped bool
lastErr error
)
b := backoff.New(ctx, backoffCfg)
for b.Ongoing() {
// Reset across retries — each attempt is independent.
swapped = false
err := m.bucket.GetAndReplace(ctx, tocPath, func(existing io.ReadCloser) (io.ReadCloser, error) {
// Drain `existing` fully up-front so we never return a drained reader.
var existingBytes []byte
if existing != nil {
defer existing.Close()
var rerr error
existingBytes, rerr = io.ReadAll(existing)
if rerr != nil {
return nil, fmt.Errorf("reading existing ToC: %w", rerr)
}
}
// Missing or empty ToC: nothing to remove → idempotent no-op.
// Abort the conditional PUT entirely so we don't materialize an empty
// object at tocPath. The outer loop converts this sentinel to (false, nil).
if len(existingBytes) == 0 {
return nil, errReplaceNoOp
}
obj, oerr := dataobj.FromReaderAt(bytes.NewReader(existingBytes), int64(len(existingBytes)))
if oerr != nil {
return nil, fmt.Errorf("parsing existing ToC: %w", oerr)
}
// Pass 1: detect whether any oldPaths are still present in the target tenant.
anyMatched, scanErr := scanForMatches(ctx, obj, tenant, oldSet)
if scanErr != nil {
return nil, scanErr
}
if !anyMatched {
// Race-loss / already-converged: do not modify the ToC.
// Returning the sentinel aborts the conditional PUT without
// writing a no-op blob. See the missing-ToC branch above.
return nil, errReplaceNoOp
}
// Pass 2: rebuild ToC, dropping target tenant's oldPaths and appending newEntries.
builder, berr := indexobj.NewBuilder(tocBuilderCfg, nil)
if berr != nil {
return nil, fmt.Errorf("creating ToC builder: %w", berr)
}
if err := replayFiltered(ctx, obj, builder, tenant, oldSet); err != nil {
return nil, err
}
for _, e := range newEntries {
if err := builder.AppendIndexPointer(tenant, e.Path, e.StartTime, e.EndTime); err != nil {
return nil, fmt.Errorf("appending new ToC entry: %w", err)
}
}
newObj, closer, ferr := builder.Flush()
if ferr != nil {
return nil, fmt.Errorf("flushing rebuilt ToC: %w", ferr)
}
reader, rerr := newObj.Reader(ctx)
if rerr != nil {
_ = closer.Close()
return nil, fmt.Errorf("opening rebuilt ToC reader: %w", rerr)
}
swapped = true
return &wrappedReadCloser{
rc: reader,
OnClose: func() error {
return errors.Join(reader.Close(), closer.Close())
},
}, nil
})
if err == nil {
return swapped, nil
}
if errors.Is(err, errReplaceNoOp) {
return swapped, nil
}
lastErr = err
b.Wait()
}
if lastErr == nil {
lastErr = b.Err()
}
return false, lastErr
}
// scanForMatches reports whether any row in any section of obj is owned by
// the target tenant AND has a path present in oldSet.
func scanForMatches(ctx context.Context, obj *dataobj.Object, tenant string, oldSet map[string]struct{}) (bool, error) {
var reader indexpointers.RowReader
defer reader.Close()
buf := make([]indexpointers.IndexPointer, 256)
for _, section := range obj.Sections().Filter(indexpointers.CheckSection) {
if section.Tenant != tenant {
continue
}
sec, err := indexpointers.Open(ctx, section)
if err != nil {
return false, fmt.Errorf("opening section: %w", err)
}
reader.Reset(sec)
if err := reader.Open(ctx); err != nil {
return false, fmt.Errorf("opening row reader: %w", err)
}
for {
n, err := reader.Read(ctx, buf)
for i := range n {
if _, ok := oldSet[buf[i].Path]; ok {
return true, nil
}
}
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return false, fmt.Errorf("reading rows: %w", err)
}
}
}
return false, nil
}
// replayFiltered replays every row from every section of obj into builder,
// EXCEPT rows belonging to the target tenant whose path is in oldSet.
func replayFiltered(ctx context.Context, obj *dataobj.Object, builder *indexobj.Builder, tenant string, oldSet map[string]struct{}) error {
var reader indexpointers.RowReader
defer reader.Close()
buf := make([]indexpointers.IndexPointer, 256)
for _, section := range obj.Sections().Filter(indexpointers.CheckSection) {
sec, err := indexpointers.Open(ctx, section)
if err != nil {
return fmt.Errorf("opening section: %w", err)
}
sectionTenant := section.Tenant
reader.Reset(sec)
if err := reader.Open(ctx); err != nil {
return fmt.Errorf("opening row reader: %w", err)
}
for {
n, err := reader.Read(ctx, buf)
for i := range n {
if sectionTenant == tenant {
if _, drop := oldSet[buf[i].Path]; drop {
continue
}
}
if aerr := builder.AppendIndexPointer(sectionTenant, buf[i].Path, buf[i].StartTs, buf[i].EndTs); aerr != nil {
return fmt.Errorf("replaying index pointer: %w", aerr)
}
}
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("reading rows: %w", err)
}
}
}
return nil
}