mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
277 lines
8.8 KiB
277 lines
8.8 KiB
package metastore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/grafana/dskit/backoff"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
|
|
)
|
|
|
|
// TableOfContentsEntry describes an index-pointer row to add to a ToC for a
|
|
// given tenant. Used by ReplaceIndexPointers as the "to add" set.
|
|
type TableOfContentsEntry struct {
|
|
// Path is the object-storage path of the index object.
|
|
Path string
|
|
// StartTime / EndTime bound the time range covered by the index.
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
}
|
|
|
|
// replaceBackoffConfig bounds ReplaceIndexPointers retries on conditional-write
|
|
// rejection (412 PreconditionFailed) or transient bucket errors.
|
|
var replaceBackoffConfig = backoff.Config{
|
|
MinBackoff: 50 * time.Millisecond,
|
|
MaxBackoff: 10 * time.Second,
|
|
MaxRetries: 10,
|
|
}
|
|
|
|
// errReplaceNoOp is a sentinel returned from the GetAndReplace callback to
|
|
// signal "do not write". It is consumed by replaceIndexPointers and converted
|
|
// to (false, nil); it never escapes the package.
|
|
var errReplaceNoOp = errors.New("replace-index-pointers: no-op")
|
|
|
|
// ReplaceIndexPointers atomically swaps a set of index pointers in the
|
|
// ToC for the given window. For the target tenant, every row in oldPaths
|
|
// is removed and every entry in newEntries is added; all other tenants'
|
|
// entries are preserved unchanged.
|
|
//
|
|
// Returns (true, nil) if the swap was applied.
|
|
// Returns (false, nil) if there's nothing to do, examples are:
|
|
// - both oldPaths and newEntries are empty
|
|
// - oldPaths do not exist in TOC (race-loss)
|
|
// - TOC doesn't exist
|
|
// Returns (false, error) if an error happened (including retry exhaustion).
|
|
//
|
|
// Race-loss is detected on an ANY-match basis: if ANY oldPath is still
|
|
// present in the target tenant's current section, the swap proceeds and
|
|
// drops the matched subset (leaving non-matched oldPaths' replacements, if
|
|
// they were already swapped in by a concurrent coordinator, untouched).
|
|
// Only when ZERO oldPaths match is the call treated as a no-op. Callers
|
|
// orchestrating per-cycle plans against a single ToC snapshot will see
|
|
// all-or-nothing matches in practice; partial overlaps can only occur in
|
|
// rare cross-cycle races and produce bounded duplicate index entries that
|
|
// the next cycle's plan will re-merge.
|
|
//
|
|
// The primitive is idempotent: re-invoking it with already-applied
|
|
// oldPaths/newEntries is a no-op.
|
|
//
|
|
// Callers must serialize overlapping ReplaceIndexPointers calls for the
|
|
// same window within a process; the method allocates per-call state but
|
|
// does not coordinate across goroutines. Concurrent processes racing on
|
|
// the same window are safe because each call goes through a fresh
|
|
// GetAndReplace with conditional-PUT semantics.
|
|
func (m *TableOfContentsWriter) ReplaceIndexPointers(
|
|
ctx context.Context,
|
|
window time.Time,
|
|
tenant string,
|
|
oldPaths []string,
|
|
newEntries []TableOfContentsEntry,
|
|
) (bool, error) {
|
|
oldEmpty := len(oldPaths) == 0
|
|
newEmpty := len(newEntries) == 0
|
|
switch {
|
|
case oldEmpty && newEmpty:
|
|
return false, nil
|
|
case oldEmpty && !newEmpty:
|
|
return false, errors.New("replace-index-pointers: no new entries")
|
|
case !oldEmpty && newEmpty:
|
|
return false, errors.New("replace-index-pointers: no old entries")
|
|
default:
|
|
return m.replaceIndexPointers(ctx, window, tenant, oldPaths, newEntries, replaceBackoffConfig)
|
|
}
|
|
}
|
|
|
|
// replaceIndexPointers is the internal entrypoint that accepts an explicit
|
|
// backoff config, exposed only to same-package tests for retry-exhaustion
|
|
// coverage. Production callers go through ReplaceIndexPointers.
|
|
func (m *TableOfContentsWriter) replaceIndexPointers(
|
|
ctx context.Context,
|
|
window time.Time,
|
|
tenant string,
|
|
oldPaths []string,
|
|
newEntries []TableOfContentsEntry,
|
|
backoffCfg backoff.Config,
|
|
) (bool, error) {
|
|
tocPath := TableOfContentsPath(window.Truncate(MetastoreWindowSize).UTC())
|
|
|
|
oldSet := make(map[string]struct{}, len(oldPaths))
|
|
for _, p := range oldPaths {
|
|
oldSet[p] = struct{}{}
|
|
}
|
|
|
|
var (
|
|
swapped bool
|
|
lastErr error
|
|
)
|
|
b := backoff.New(ctx, backoffCfg)
|
|
for b.Ongoing() {
|
|
// Reset across retries — each attempt is independent.
|
|
swapped = false
|
|
|
|
err := m.bucket.GetAndReplace(ctx, tocPath, func(existing io.ReadCloser) (io.ReadCloser, error) {
|
|
// Drain `existing` fully up-front so we never return a drained reader.
|
|
var existingBytes []byte
|
|
if existing != nil {
|
|
defer existing.Close()
|
|
var rerr error
|
|
existingBytes, rerr = io.ReadAll(existing)
|
|
if rerr != nil {
|
|
return nil, fmt.Errorf("reading existing ToC: %w", rerr)
|
|
}
|
|
}
|
|
|
|
// Missing or empty ToC: nothing to remove → idempotent no-op.
|
|
// Abort the conditional PUT entirely so we don't materialize an empty
|
|
// object at tocPath. The outer loop converts this sentinel to (false, nil).
|
|
if len(existingBytes) == 0 {
|
|
return nil, errReplaceNoOp
|
|
}
|
|
|
|
obj, oerr := dataobj.FromReaderAt(bytes.NewReader(existingBytes), int64(len(existingBytes)))
|
|
if oerr != nil {
|
|
return nil, fmt.Errorf("parsing existing ToC: %w", oerr)
|
|
}
|
|
|
|
// Pass 1: detect whether any oldPaths are still present in the target tenant.
|
|
anyMatched, scanErr := scanForMatches(ctx, obj, tenant, oldSet)
|
|
if scanErr != nil {
|
|
return nil, scanErr
|
|
}
|
|
if !anyMatched {
|
|
// Race-loss / already-converged: do not modify the ToC.
|
|
// Returning the sentinel aborts the conditional PUT without
|
|
// writing a no-op blob. See the missing-ToC branch above.
|
|
return nil, errReplaceNoOp
|
|
}
|
|
|
|
// Pass 2: rebuild ToC, dropping target tenant's oldPaths and appending newEntries.
|
|
builder, berr := indexobj.NewBuilder(tocBuilderCfg, nil)
|
|
if berr != nil {
|
|
return nil, fmt.Errorf("creating ToC builder: %w", berr)
|
|
}
|
|
|
|
if err := replayFiltered(ctx, obj, builder, tenant, oldSet); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, e := range newEntries {
|
|
if err := builder.AppendIndexPointer(tenant, e.Path, e.StartTime, e.EndTime); err != nil {
|
|
return nil, fmt.Errorf("appending new ToC entry: %w", err)
|
|
}
|
|
}
|
|
|
|
newObj, closer, ferr := builder.Flush()
|
|
if ferr != nil {
|
|
return nil, fmt.Errorf("flushing rebuilt ToC: %w", ferr)
|
|
}
|
|
reader, rerr := newObj.Reader(ctx)
|
|
if rerr != nil {
|
|
_ = closer.Close()
|
|
return nil, fmt.Errorf("opening rebuilt ToC reader: %w", rerr)
|
|
}
|
|
swapped = true
|
|
return &wrappedReadCloser{
|
|
rc: reader,
|
|
OnClose: func() error {
|
|
return errors.Join(reader.Close(), closer.Close())
|
|
},
|
|
}, nil
|
|
})
|
|
|
|
if err == nil {
|
|
return swapped, nil
|
|
}
|
|
if errors.Is(err, errReplaceNoOp) {
|
|
return swapped, nil
|
|
}
|
|
lastErr = err
|
|
b.Wait()
|
|
}
|
|
if lastErr == nil {
|
|
lastErr = b.Err()
|
|
}
|
|
return false, lastErr
|
|
}
|
|
|
|
// scanForMatches reports whether any row in any section of obj is owned by
|
|
// the target tenant AND has a path present in oldSet.
|
|
func scanForMatches(ctx context.Context, obj *dataobj.Object, tenant string, oldSet map[string]struct{}) (bool, error) {
|
|
var reader indexpointers.RowReader
|
|
defer reader.Close()
|
|
buf := make([]indexpointers.IndexPointer, 256)
|
|
for _, section := range obj.Sections().Filter(indexpointers.CheckSection) {
|
|
if section.Tenant != tenant {
|
|
continue
|
|
}
|
|
sec, err := indexpointers.Open(ctx, section)
|
|
if err != nil {
|
|
return false, fmt.Errorf("opening section: %w", err)
|
|
}
|
|
reader.Reset(sec)
|
|
if err := reader.Open(ctx); err != nil {
|
|
return false, fmt.Errorf("opening row reader: %w", err)
|
|
}
|
|
for {
|
|
n, err := reader.Read(ctx, buf)
|
|
for i := range n {
|
|
if _, ok := oldSet[buf[i].Path]; ok {
|
|
return true, nil
|
|
}
|
|
}
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return false, fmt.Errorf("reading rows: %w", err)
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// replayFiltered replays every row from every section of obj into builder,
|
|
// EXCEPT rows belonging to the target tenant whose path is in oldSet.
|
|
func replayFiltered(ctx context.Context, obj *dataobj.Object, builder *indexobj.Builder, tenant string, oldSet map[string]struct{}) error {
|
|
var reader indexpointers.RowReader
|
|
defer reader.Close()
|
|
buf := make([]indexpointers.IndexPointer, 256)
|
|
for _, section := range obj.Sections().Filter(indexpointers.CheckSection) {
|
|
sec, err := indexpointers.Open(ctx, section)
|
|
if err != nil {
|
|
return fmt.Errorf("opening section: %w", err)
|
|
}
|
|
sectionTenant := section.Tenant
|
|
reader.Reset(sec)
|
|
if err := reader.Open(ctx); err != nil {
|
|
return fmt.Errorf("opening row reader: %w", err)
|
|
}
|
|
for {
|
|
n, err := reader.Read(ctx, buf)
|
|
for i := range n {
|
|
if sectionTenant == tenant {
|
|
if _, drop := oldSet[buf[i].Path]; drop {
|
|
continue
|
|
}
|
|
}
|
|
if aerr := builder.AppendIndexPointer(sectionTenant, buf[i].Path, buf[i].StartTs, buf[i].EndTs); aerr != nil {
|
|
return fmt.Errorf("replaying index pointer: %w", aerr)
|
|
}
|
|
}
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("reading rows: %w", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|