mirror of https://github.com/grafana/loki
refactor(bloom planner): Compute gaps and build tasks from metas and TSDBs (#12994)
parent
7a3338ead8
commit
3195036435
@ -1,21 +1,40 @@ |
||||
package planner |
||||
|
||||
import "flag" |
||||
import ( |
||||
"flag" |
||||
"fmt" |
||||
"time" |
||||
) |
||||
|
||||
// Config configures the bloom-planner component.
|
||||
type Config struct { |
||||
// TODO: Add config
|
||||
PlanningInterval time.Duration `yaml:"planning_interval"` |
||||
MinTableOffset int `yaml:"min_table_offset"` |
||||
MaxTableOffset int `yaml:"max_table_offset"` |
||||
} |
||||
|
||||
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
|
||||
func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) { |
||||
// TODO: Register flags with flagsPrefix
|
||||
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { |
||||
f.DurationVar(&cfg.PlanningInterval, prefix+".interval", 8*time.Hour, "Interval at which to re-run the bloom creation planning.") |
||||
f.IntVar(&cfg.MinTableOffset, prefix+".min-table-offset", 1, "Newest day-table offset (from today, inclusive) to build blooms for. Increase to lower cost by not re-writing data to object storage too frequently since recent data changes more often at the cost of not having blooms available as quickly.") |
||||
// TODO(owen-d): ideally we'd set this per tenant based on their `reject_old_samples_max_age` setting,
|
||||
// but due to how we need to discover tenants, we can't do that yet. Tenant+Period discovery is done by
|
||||
// iterating the table periods in object storage and looking for tenants within that period.
|
||||
// In order to have this done dynamically, we'd need to account for tenant specific overrides, which are also
|
||||
// dynamically reloaded.
|
||||
// I'm doing it the simple way for now.
|
||||
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.") |
||||
} |
||||
|
||||
func (cfg *Config) Validate() error { |
||||
if cfg.MinTableOffset > cfg.MaxTableOffset { |
||||
return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
type Limits interface { |
||||
// TODO: Add limits
|
||||
BloomCreationEnabled(tenantID string) bool |
||||
BloomSplitSeriesKeyspaceBy(tenantID string) int |
||||
} |
||||
|
@ -0,0 +1,321 @@ |
||||
package planner |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" |
||||
) |
||||
|
||||
func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { |
||||
return tsdb.SingleTenantTSDBIdentifier{ |
||||
TS: time.Unix(int64(n), 0), |
||||
} |
||||
} |
||||
|
||||
func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { |
||||
m := bloomshipper.Meta{ |
||||
MetaRef: bloomshipper.MetaRef{ |
||||
Ref: bloomshipper.Ref{ |
||||
Bounds: v1.NewBounds(min, max), |
||||
}, |
||||
}, |
||||
Blocks: blocks, |
||||
} |
||||
for _, source := range sources { |
||||
m.Sources = append(m.Sources, tsdbID(source)) |
||||
} |
||||
return m |
||||
} |
||||
|
||||
func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { |
||||
|
||||
for _, tc := range []struct { |
||||
desc string |
||||
err bool |
||||
exp []tsdbGaps |
||||
ownershipRange v1.FingerprintBounds |
||||
tsdbs []tsdb.SingleTenantTSDBIdentifier |
||||
metas []bloomshipper.Meta |
||||
}{ |
||||
{ |
||||
desc: "non-overlapping tsdbs and metas", |
||||
err: true, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(11, 20, []int{0}, nil), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "single tsdb", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(4, 8, []int{0}, nil), |
||||
}, |
||||
exp: []tsdbGaps{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 3), |
||||
v1.NewBounds(9, 10), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "multiple tsdbs with separate blocks", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(0, 5, []int{0}, nil), |
||||
genMeta(6, 10, []int{1}, nil), |
||||
}, |
||||
exp: []tsdbGaps{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []v1.FingerprintBounds{ |
||||
v1.NewBounds(6, 10), |
||||
}, |
||||
}, |
||||
{ |
||||
tsdb: tsdbID(1), |
||||
gaps: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 5), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "multiple tsdbs with the same blocks", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(0, 5, []int{0, 1}, nil), |
||||
genMeta(6, 8, []int{1}, nil), |
||||
}, |
||||
exp: []tsdbGaps{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []v1.FingerprintBounds{ |
||||
v1.NewBounds(6, 10), |
||||
}, |
||||
}, |
||||
{ |
||||
tsdb: tsdbID(1), |
||||
gaps: []v1.FingerprintBounds{ |
||||
v1.NewBounds(9, 10), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
} { |
||||
t.Run(tc.desc, func(t *testing.T) { |
||||
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) |
||||
if tc.err { |
||||
require.Error(t, err) |
||||
return |
||||
} |
||||
require.Equal(t, tc.exp, gaps) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { |
||||
bounds := v1.NewBounds(min, max) |
||||
return bloomshipper.BlockRef{ |
||||
Ref: bloomshipper.Ref{ |
||||
Bounds: bounds, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func Test_blockPlansForGaps(t *testing.T) { |
||||
for _, tc := range []struct { |
||||
desc string |
||||
ownershipRange v1.FingerprintBounds |
||||
tsdbs []tsdb.SingleTenantTSDBIdentifier |
||||
metas []bloomshipper.Meta |
||||
err bool |
||||
exp []blockPlan |
||||
}{ |
||||
{ |
||||
desc: "single overlapping meta+no overlapping block", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}), |
||||
}, |
||||
exp: []blockPlan{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []GapWithBlocks{ |
||||
{ |
||||
bounds: v1.NewBounds(0, 10), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "single overlapping meta+one overlapping block", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), |
||||
}, |
||||
exp: []blockPlan{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []GapWithBlocks{ |
||||
{ |
||||
bounds: v1.NewBounds(0, 10), |
||||
blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
// the range which needs to be generated doesn't overlap with existing blocks
|
||||
// from other tsdb versions since theres an up to date tsdb version block,
|
||||
// but we can trim the range needing generation
|
||||
desc: "trims up to date area", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
|
||||
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb
|
||||
}, |
||||
exp: []blockPlan{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []GapWithBlocks{ |
||||
{ |
||||
bounds: v1.NewBounds(0, 8), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "uses old block for overlapping range", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
|
||||
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb
|
||||
}, |
||||
exp: []blockPlan{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []GapWithBlocks{ |
||||
{ |
||||
bounds: v1.NewBounds(0, 8), |
||||
blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "multi case", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs
|
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ |
||||
genBlockRef(0, 1), |
||||
genBlockRef(1, 2), |
||||
}), // tsdb_0
|
||||
genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0
|
||||
|
||||
genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1
|
||||
genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1
|
||||
}, |
||||
exp: []blockPlan{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []GapWithBlocks{ |
||||
// tsdb (id=0) can source chunks from the blocks built from tsdb (id=1)
|
||||
{ |
||||
bounds: v1.NewBounds(3, 5), |
||||
blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)}, |
||||
}, |
||||
{ |
||||
bounds: v1.NewBounds(9, 10), |
||||
blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)}, |
||||
}, |
||||
}, |
||||
}, |
||||
// tsdb (id=1) can source chunks from the blocks built from tsdb (id=0)
|
||||
{ |
||||
tsdb: tsdbID(1), |
||||
gaps: []GapWithBlocks{ |
||||
{ |
||||
bounds: v1.NewBounds(0, 2), |
||||
blocks: []bloomshipper.BlockRef{ |
||||
genBlockRef(0, 1), |
||||
genBlockRef(1, 2), |
||||
}, |
||||
}, |
||||
{ |
||||
bounds: v1.NewBounds(6, 7), |
||||
blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "dedupes block refs", |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, |
||||
metas: []bloomshipper.Meta{ |
||||
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ |
||||
genBlockRef(1, 4), |
||||
genBlockRef(9, 20), |
||||
}), // blocks for first diff tsdb
|
||||
genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{ |
||||
genBlockRef(5, 10), |
||||
genBlockRef(9, 20), // same block references in prior meta (will be deduped)
|
||||
}), // block for second diff tsdb
|
||||
}, |
||||
exp: []blockPlan{ |
||||
{ |
||||
tsdb: tsdbID(0), |
||||
gaps: []GapWithBlocks{ |
||||
{ |
||||
bounds: v1.NewBounds(0, 10), |
||||
blocks: []bloomshipper.BlockRef{ |
||||
genBlockRef(1, 4), |
||||
genBlockRef(5, 10), |
||||
genBlockRef(9, 20), |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
} { |
||||
t.Run(tc.desc, func(t *testing.T) { |
||||
// we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested
|
||||
// separately and it's used to generate input in our regular code path (easier to write tests this way).
|
||||
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) |
||||
require.NoError(t, err) |
||||
|
||||
plans, err := blockPlansForGaps(gaps, tc.metas) |
||||
if tc.err { |
||||
require.Error(t, err) |
||||
return |
||||
} |
||||
require.Equal(t, tc.exp, plans) |
||||
|
||||
}) |
||||
} |
||||
} |
@ -0,0 +1,50 @@ |
||||
package planner |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/storage/config" |
||||
) |
||||
|
||||
type dayRangeIterator struct { |
||||
min, max, cur config.DayTime |
||||
curPeriod config.PeriodConfig |
||||
schemaCfg config.SchemaConfig |
||||
err error |
||||
} |
||||
|
||||
func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator { |
||||
return &dayRangeIterator{min: min, max: max, cur: min.Dec(), schemaCfg: schemaCfg} |
||||
} |
||||
|
||||
func (r *dayRangeIterator) TotalDays() int { |
||||
offset := r.cur |
||||
if r.cur.Before(r.min) { |
||||
offset = r.min |
||||
} |
||||
return int(r.max.Sub(offset.Time) / config.ObjectStorageIndexRequiredPeriod) |
||||
} |
||||
|
||||
func (r *dayRangeIterator) Next() bool { |
||||
r.cur = r.cur.Inc() |
||||
if !r.cur.Before(r.max) { |
||||
return false |
||||
} |
||||
|
||||
period, err := r.schemaCfg.SchemaForTime(r.cur.ModelTime()) |
||||
if err != nil { |
||||
r.err = fmt.Errorf("getting schema for time (%s): %w", r.cur, err) |
||||
return false |
||||
} |
||||
r.curPeriod = period |
||||
|
||||
return true |
||||
} |
||||
|
||||
func (r *dayRangeIterator) At() config.DayTable { |
||||
return config.NewDayTable(r.cur, r.curPeriod.IndexTables.Prefix) |
||||
} |
||||
|
||||
func (r *dayRangeIterator) Err() error { |
||||
return nil |
||||
} |
@ -0,0 +1,22 @@ |
||||
package planner |
||||
|
||||
import ( |
||||
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" |
||||
) |
||||
|
||||
// TODO: Extract this definiton to a proto file at pkg/bloombuild/protos/protos.proto
|
||||
|
||||
type GapWithBlocks struct { |
||||
bounds v1.FingerprintBounds |
||||
blocks []bloomshipper.BlockRef |
||||
} |
||||
|
||||
type Task struct { |
||||
table string |
||||
tenant string |
||||
OwnershipBounds v1.FingerprintBounds |
||||
tsdb tsdb.SingleTenantTSDBIdentifier |
||||
gaps []GapWithBlocks |
||||
} |
@ -0,0 +1,261 @@ |
||||
package planner |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
"math" |
||||
"path" |
||||
"strings" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/chunkenc" |
||||
baseStore "github.com/grafana/loki/v3/pkg/storage" |
||||
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
||||
"github.com/grafana/loki/v3/pkg/storage/config" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" |
||||
"github.com/grafana/loki/v3/pkg/storage/types" |
||||
) |
||||
|
||||
const ( |
||||
gzipExtension = ".gz" |
||||
) |
||||
|
||||
type TSDBStore interface { |
||||
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) |
||||
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) |
||||
LoadTSDB( |
||||
ctx context.Context, |
||||
table config.DayTable, |
||||
tenant string, |
||||
id tsdb.Identifier, |
||||
bounds v1.FingerprintBounds, |
||||
) (v1.Iterator[*v1.Series], error) |
||||
} |
||||
|
||||
// BloomTSDBStore is a wrapper around the storage.Client interface which
|
||||
// implements the TSDBStore interface for this pkg.
|
||||
type BloomTSDBStore struct { |
||||
storage storage.Client |
||||
logger log.Logger |
||||
} |
||||
|
||||
func NewBloomTSDBStore(storage storage.Client, logger log.Logger) *BloomTSDBStore { |
||||
return &BloomTSDBStore{ |
||||
storage: storage, |
||||
logger: logger, |
||||
} |
||||
} |
||||
|
||||
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { |
||||
_, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing
|
||||
return users, err |
||||
} |
||||
|
||||
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { |
||||
indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing
|
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to list user files") |
||||
} |
||||
|
||||
ids := make([]tsdb.SingleTenantTSDBIdentifier, 0, len(indices)) |
||||
for _, index := range indices { |
||||
key := index.Name |
||||
if decompress := storage.IsCompressedFile(index.Name); decompress { |
||||
key = strings.TrimSuffix(key, gzipExtension) |
||||
} |
||||
|
||||
id, ok := tsdb.ParseSingleTenantTSDBPath(path.Base(key)) |
||||
if !ok { |
||||
return nil, errors.Errorf("failed to parse single tenant tsdb path: %s", key) |
||||
} |
||||
|
||||
ids = append(ids, id) |
||||
|
||||
} |
||||
return ids, nil |
||||
} |
||||
|
||||
func (b *BloomTSDBStore) LoadTSDB( |
||||
ctx context.Context, |
||||
table config.DayTable, |
||||
tenant string, |
||||
id tsdb.Identifier, |
||||
bounds v1.FingerprintBounds, |
||||
) (v1.Iterator[*v1.Series], error) { |
||||
withCompression := id.Name() + gzipExtension |
||||
|
||||
data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to get file") |
||||
} |
||||
defer data.Close() |
||||
|
||||
decompressorPool := chunkenc.GetReaderPool(chunkenc.EncGZIP) |
||||
decompressor, err := decompressorPool.GetReader(data) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to get decompressor") |
||||
} |
||||
defer decompressorPool.PutReader(decompressor) |
||||
|
||||
buf, err := io.ReadAll(decompressor) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to read file") |
||||
} |
||||
|
||||
reader, err := index.NewReader(index.RealByteSlice(buf)) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to create index reader") |
||||
} |
||||
|
||||
idx := tsdb.NewTSDBIndex(reader) |
||||
defer func() { |
||||
if err := idx.Close(); err != nil { |
||||
level.Error(b.logger).Log("msg", "failed to close index", "err", err) |
||||
} |
||||
}() |
||||
|
||||
return NewTSDBSeriesIter(ctx, tenant, idx, bounds) |
||||
} |
||||
|
||||
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) { |
||||
// TODO(salvacorts): Create a pool
|
||||
series := make([]*v1.Series, 0, 100) |
||||
|
||||
if err := f.ForSeries( |
||||
ctx, |
||||
user, |
||||
bounds, |
||||
0, math.MaxInt64, |
||||
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return true |
||||
default: |
||||
res := &v1.Series{ |
||||
Fingerprint: fp, |
||||
Chunks: make(v1.ChunkRefs, 0, len(chks)), |
||||
} |
||||
for _, chk := range chks { |
||||
res.Chunks = append(res.Chunks, v1.ChunkRef{ |
||||
From: model.Time(chk.MinTime), |
||||
Through: model.Time(chk.MaxTime), |
||||
Checksum: chk.Checksum, |
||||
}) |
||||
} |
||||
|
||||
series = append(series, res) |
||||
return false |
||||
} |
||||
}, |
||||
labels.MustNewMatcher(labels.MatchEqual, "", ""), |
||||
); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return v1.NewEmptyIter[*v1.Series](), ctx.Err() |
||||
default: |
||||
return v1.NewCancelableIter[*v1.Series](ctx, v1.NewSliceIter[*v1.Series](series)), nil |
||||
} |
||||
} |
||||
|
||||
type TSDBStores struct { |
||||
schemaCfg config.SchemaConfig |
||||
stores []TSDBStore |
||||
} |
||||
|
||||
func NewTSDBStores( |
||||
schemaCfg config.SchemaConfig, |
||||
storeCfg baseStore.Config, |
||||
clientMetrics baseStore.ClientMetrics, |
||||
logger log.Logger, |
||||
) (*TSDBStores, error) { |
||||
res := &TSDBStores{ |
||||
schemaCfg: schemaCfg, |
||||
stores: make([]TSDBStore, len(schemaCfg.Configs)), |
||||
} |
||||
|
||||
for i, cfg := range schemaCfg.Configs { |
||||
if cfg.IndexType == types.TSDBType { |
||||
|
||||
c, err := baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to create object client") |
||||
} |
||||
res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix), logger) |
||||
} |
||||
} |
||||
|
||||
return res, nil |
||||
} |
||||
|
||||
func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) { |
||||
for i := len(s.schemaCfg.Configs) - 1; i >= 0; i-- { |
||||
period := s.schemaCfg.Configs[i] |
||||
|
||||
if !table.Before(period.From) { |
||||
// we have the desired period config
|
||||
|
||||
if s.stores[i] != nil { |
||||
// valid: it's of tsdb type
|
||||
return s.stores[i], nil |
||||
} |
||||
|
||||
// invalid
|
||||
return nil, errors.Errorf( |
||||
"store for period is not of TSDB type (%s) while looking up store for (%v)", |
||||
period.IndexType, |
||||
table, |
||||
) |
||||
} |
||||
|
||||
} |
||||
|
||||
return nil, fmt.Errorf( |
||||
"there is no store matching no matching period found for table (%v) -- too early", |
||||
table, |
||||
) |
||||
} |
||||
|
||||
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { |
||||
store, err := s.storeForPeriod(table.DayTime) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return store.UsersForPeriod(ctx, table) |
||||
} |
||||
|
||||
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { |
||||
store, err := s.storeForPeriod(table.DayTime) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return store.ResolveTSDBs(ctx, table, tenant) |
||||
} |
||||
|
||||
func (s *TSDBStores) LoadTSDB( |
||||
ctx context.Context, |
||||
table config.DayTable, |
||||
tenant string, |
||||
id tsdb.Identifier, |
||||
bounds v1.FingerprintBounds, |
||||
) (v1.Iterator[*v1.Series], error) { |
||||
store, err := s.storeForPeriod(table.DayTime) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return store.LoadTSDB(ctx, table, tenant, id, bounds) |
||||
} |
@ -0,0 +1,105 @@ |
||||
package planner |
||||
|
||||
import ( |
||||
"context" |
||||
"math" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" |
||||
) |
||||
|
||||
type forSeriesTestImpl []*v1.Series |
||||
|
||||
func (f forSeriesTestImpl) ForSeries( |
||||
_ context.Context, |
||||
_ string, |
||||
_ index.FingerprintFilter, |
||||
_ model.Time, |
||||
_ model.Time, |
||||
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) bool, |
||||
_ ...*labels.Matcher, |
||||
) error { |
||||
for i := range f { |
||||
unmapped := make([]index.ChunkMeta, 0, len(f[i].Chunks)) |
||||
for _, c := range f[i].Chunks { |
||||
unmapped = append(unmapped, index.ChunkMeta{ |
||||
MinTime: int64(c.From), |
||||
MaxTime: int64(c.Through), |
||||
Checksum: c.Checksum, |
||||
}) |
||||
} |
||||
|
||||
fn(nil, f[i].Fingerprint, unmapped) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (f forSeriesTestImpl) Close() error { |
||||
return nil |
||||
} |
||||
|
||||
func TestTSDBSeriesIter(t *testing.T) { |
||||
input := []*v1.Series{ |
||||
{ |
||||
Fingerprint: 1, |
||||
Chunks: []v1.ChunkRef{ |
||||
{ |
||||
From: 0, |
||||
Through: 1, |
||||
Checksum: 2, |
||||
}, |
||||
{ |
||||
From: 3, |
||||
Through: 4, |
||||
Checksum: 5, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
srcItr := v1.NewSliceIter(input) |
||||
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64)) |
||||
require.NoError(t, err) |
||||
|
||||
v1.EqualIterators[*v1.Series]( |
||||
t, |
||||
func(a, b *v1.Series) { |
||||
require.Equal(t, a, b) |
||||
}, |
||||
itr, |
||||
srcItr, |
||||
) |
||||
} |
||||
|
||||
func TestTSDBSeriesIter_Expiry(t *testing.T) { |
||||
t.Run("expires on creation", func(t *testing.T) { |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
cancel() |
||||
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{ |
||||
{}, // a single entry
|
||||
}, v1.NewBounds(0, math.MaxUint64)) |
||||
require.Error(t, err) |
||||
require.False(t, itr.Next()) |
||||
}) |
||||
|
||||
t.Run("expires during consumption", func(t *testing.T) { |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{ |
||||
{}, |
||||
{}, |
||||
}, v1.NewBounds(0, math.MaxUint64)) |
||||
require.NoError(t, err) |
||||
|
||||
require.True(t, itr.Next()) |
||||
require.NoError(t, itr.Err()) |
||||
|
||||
cancel() |
||||
require.False(t, itr.Next()) |
||||
require.Error(t, itr.Err()) |
||||
}) |
||||
|
||||
} |
@ -0,0 +1,125 @@ |
||||
package planner |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
|
||||
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
||||
) |
||||
|
||||
// SplitFingerprintKeyspaceByFactor splits the keyspace covered by model.Fingerprint into contiguous non-overlapping ranges.
|
||||
func SplitFingerprintKeyspaceByFactor(factor int) []v1.FingerprintBounds { |
||||
if factor <= 0 { |
||||
return nil |
||||
} |
||||
|
||||
bounds := make([]v1.FingerprintBounds, 0, factor) |
||||
|
||||
// The keyspace of a Fingerprint is from 0 to max uint64.
|
||||
keyspaceSize := uint64(math.MaxUint64) |
||||
|
||||
// Calculate the size of each range.
|
||||
rangeSize := keyspaceSize / uint64(factor) |
||||
|
||||
for i := 0; i < factor; i++ { |
||||
// Calculate the start and end of the range.
|
||||
start := uint64(i) * rangeSize |
||||
end := start + rangeSize - 1 |
||||
|
||||
// For the last range, make sure it ends at the end of the keyspace.
|
||||
if i == factor-1 { |
||||
end = keyspaceSize |
||||
} |
||||
|
||||
// Create a FingerprintBounds for the range and add it to the slice.
|
||||
bounds = append(bounds, v1.FingerprintBounds{ |
||||
Min: model.Fingerprint(start), |
||||
Max: model.Fingerprint(end), |
||||
}) |
||||
} |
||||
|
||||
return bounds |
||||
} |
||||
|
||||
func FindGapsInFingerprintBounds(ownershipRange v1.FingerprintBounds, metas []v1.FingerprintBounds) (gaps []v1.FingerprintBounds, err error) { |
||||
if len(metas) == 0 { |
||||
return []v1.FingerprintBounds{ownershipRange}, nil |
||||
} |
||||
|
||||
// turn the available metas into a list of non-overlapping metas
|
||||
// for easier processing
|
||||
var nonOverlapping []v1.FingerprintBounds |
||||
// First, we reduce the metas into a smaller set by combining overlaps. They must be sorted.
|
||||
var cur *v1.FingerprintBounds |
||||
for i := 0; i < len(metas); i++ { |
||||
j := i + 1 |
||||
|
||||
// first iteration (i == 0), set the current meta
|
||||
if cur == nil { |
||||
cur = &metas[i] |
||||
} |
||||
|
||||
if j >= len(metas) { |
||||
// We've reached the end of the list. Add the last meta to the non-overlapping set.
|
||||
nonOverlapping = append(nonOverlapping, *cur) |
||||
break |
||||
} |
||||
|
||||
combined := cur.Union(metas[j]) |
||||
if len(combined) == 1 { |
||||
// There was an overlap between the two tested ranges. Combine them and keep going.
|
||||
cur = &combined[0] |
||||
continue |
||||
} |
||||
|
||||
// There was no overlap between the two tested ranges. Add the first to the non-overlapping set.
|
||||
// and keep the second for the next iteration.
|
||||
nonOverlapping = append(nonOverlapping, combined[0]) |
||||
cur = &combined[1] |
||||
} |
||||
|
||||
// Now, detect gaps between the non-overlapping metas and the ownership range.
|
||||
// The left bound of the ownership range will be adjusted as we go.
|
||||
leftBound := ownershipRange.Min |
||||
for _, meta := range nonOverlapping { |
||||
|
||||
clippedMeta := meta.Intersection(ownershipRange) |
||||
// should never happen as long as we are only combining metas
|
||||
// that intersect with the ownership range
|
||||
if clippedMeta == nil { |
||||
return nil, fmt.Errorf("meta is not within ownership range: %v", meta) |
||||
} |
||||
|
||||
searchRange := ownershipRange.Slice(leftBound, clippedMeta.Max) |
||||
// update the left bound for the next iteration
|
||||
// We do the max to prevent the max bound to overflow from MaxUInt64 to 0
|
||||
leftBound = min( |
||||
max(clippedMeta.Max+1, clippedMeta.Max), |
||||
max(ownershipRange.Max+1, ownershipRange.Max), |
||||
) |
||||
|
||||
// since we've already ensured that the meta is within the ownership range,
|
||||
// we know the xor will be of length zero (when the meta is equal to the ownership range)
|
||||
// or 1 (when the meta is a subset of the ownership range)
|
||||
xors := searchRange.Unless(*clippedMeta) |
||||
if len(xors) == 0 { |
||||
// meta is equal to the ownership range. This means the meta
|
||||
// covers this entire section of the ownership range.
|
||||
continue |
||||
} |
||||
|
||||
gaps = append(gaps, xors[0]) |
||||
} |
||||
|
||||
// If the leftBound is less than the ownership range max, and it's smaller than MaxUInt64,
|
||||
// There is a gap between the last meta and the end of the ownership range.
|
||||
// Note: we check `leftBound < math.MaxUint64` since in the loop above we clamp the
|
||||
// leftBound to MaxUint64 to prevent an overflow to 0: `max(clippedMeta.Max+1, clippedMeta.Max)`
|
||||
if leftBound < math.MaxUint64 && leftBound <= ownershipRange.Max { |
||||
gaps = append(gaps, v1.NewBounds(leftBound, ownershipRange.Max)) |
||||
} |
||||
|
||||
return gaps, nil |
||||
} |
@ -0,0 +1,172 @@ |
||||
package planner |
||||
|
||||
import ( |
||||
"math" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
||||
) |
||||
|
||||
func TestSplitFingerprintKeyspaceByFactor(t *testing.T) { |
||||
for _, tt := range []struct { |
||||
name string |
||||
factor int |
||||
}{ |
||||
{ |
||||
name: "Factor is 0", |
||||
factor: 0, |
||||
}, |
||||
{ |
||||
name: "Factor is 1", |
||||
factor: 1, |
||||
}, |
||||
{ |
||||
name: "Factor is 256", |
||||
factor: 256, |
||||
}, |
||||
} { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
got := SplitFingerprintKeyspaceByFactor(tt.factor) |
||||
|
||||
if tt.factor == 0 { |
||||
require.Empty(t, got) |
||||
return |
||||
} |
||||
|
||||
// Check overall min and max values of the ranges.
|
||||
require.Equal(t, model.Fingerprint(math.MaxUint64), got[len(got)-1].Max) |
||||
require.Equal(t, model.Fingerprint(0), got[0].Min) |
||||
|
||||
// For each range, check that the max value of the previous range is one less than the min value of the current range.
|
||||
for i := 1; i < len(got); i++ { |
||||
require.Equal(t, got[i-1].Max+1, got[i].Min) |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func Test_FindGapsInFingerprintBounds(t *testing.T) { |
||||
for _, tc := range []struct { |
||||
desc string |
||||
err bool |
||||
exp []v1.FingerprintBounds |
||||
ownershipRange v1.FingerprintBounds |
||||
metas []v1.FingerprintBounds |
||||
}{ |
||||
{ |
||||
desc: "error nonoverlapping metas", |
||||
err: true, |
||||
exp: nil, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{v1.NewBounds(11, 20)}, |
||||
}, |
||||
{ |
||||
desc: "one meta with entire ownership range", |
||||
err: false, |
||||
exp: nil, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{v1.NewBounds(0, 10)}, |
||||
}, |
||||
{ |
||||
desc: "two non-overlapping metas with entire ownership range", |
||||
err: false, |
||||
exp: nil, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 5), |
||||
v1.NewBounds(6, 10), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "two overlapping metas with entire ownership range", |
||||
err: false, |
||||
exp: nil, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 6), |
||||
v1.NewBounds(4, 10), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "one meta with partial ownership range", |
||||
err: false, |
||||
exp: []v1.FingerprintBounds{ |
||||
v1.NewBounds(6, 10), |
||||
}, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 5), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "smaller subsequent meta with partial ownership range", |
||||
err: false, |
||||
exp: []v1.FingerprintBounds{ |
||||
v1.NewBounds(8, 10), |
||||
}, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 7), |
||||
v1.NewBounds(3, 4), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "hole in the middle", |
||||
err: false, |
||||
exp: []v1.FingerprintBounds{ |
||||
v1.NewBounds(4, 5), |
||||
}, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 3), |
||||
v1.NewBounds(6, 10), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "holes on either end", |
||||
err: false, |
||||
exp: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, 2), |
||||
v1.NewBounds(8, 10), |
||||
}, |
||||
ownershipRange: v1.NewBounds(0, 10), |
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(3, 5), |
||||
v1.NewBounds(6, 7), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "full ownership range with single meta", |
||||
err: false, |
||||
exp: nil, |
||||
ownershipRange: v1.NewBounds(0, math.MaxUint64), |
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, math.MaxUint64), |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "full ownership range with multiple metas", |
||||
err: false, |
||||
exp: nil, |
||||
ownershipRange: v1.NewBounds(0, math.MaxUint64), |
||||
// Three metas covering the whole 0 - MaxUint64
|
||||
metas: []v1.FingerprintBounds{ |
||||
v1.NewBounds(0, math.MaxUint64/3), |
||||
v1.NewBounds(math.MaxUint64/3+1, math.MaxUint64/2), |
||||
v1.NewBounds(math.MaxUint64/2+1, math.MaxUint64), |
||||
}, |
||||
}, |
||||
} { |
||||
t.Run(tc.desc, func(t *testing.T) { |
||||
gaps, err := FindGapsInFingerprintBounds(tc.ownershipRange, tc.metas) |
||||
if tc.err { |
||||
require.Error(t, err) |
||||
return |
||||
} |
||||
require.Equal(t, tc.exp, gaps) |
||||
}) |
||||
} |
||||
} |
Loading…
Reference in new issue