feat: Extract task computing into a strategy interface (#13690)

pull/14520/head
Salva Corts 2 years ago committed by GitHub
parent 82fb2f0ae2
commit ab5e6eaaee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      docs/sources/shared/configuration.md
  2. 4
      pkg/bloombuild/planner/config.go
  3. 239
      pkg/bloombuild/planner/planner.go
  4. 349
      pkg/bloombuild/planner/planner_test.go
  5. 45
      pkg/bloombuild/planner/strategies/factory.go
  6. 240
      pkg/bloombuild/planner/strategies/splitkeyspace.go
  7. 432
      pkg/bloombuild/planner/strategies/splitkeyspace_test.go
  8. 125
      pkg/bloombuild/planner/strategies/util.go
  9. 8
      pkg/validation/limits.go

@ -3764,9 +3764,14 @@ shard_streams:
# CLI flag: -bloom-build.enable
[bloom_creation_enabled: <boolean> | default = false]
# Experimental. Number of splits to create for the series keyspace when building
# blooms. The series keyspace is split into this many parts to parallelize bloom
# creation.
# Experimental. Bloom planning strategy to use in bloom creation. Can be one of:
# 'split_keyspace_by_factor'
# CLI flag: -bloom-build.planning-strategy
[bloom_planning_strategy: <string> | default = "split_keyspace_by_factor"]
# Experimental. Only if `bloom-build.planning-strategy` is 'split'. Number of
# splits to create for the series keyspace when building blooms. The series
# keyspace is split into this many parts to parallelize bloom creation.
# CLI flag: -bloom-build.split-keyspace-by
[bloom_split_series_keyspace_by: <int> | default = 256]

@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"time"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
)
// Config configures the bloom-planner component.
@ -44,8 +46,8 @@ func (cfg *Config) Validate() error {
type Limits interface {
RetentionLimits
strategies.Limits
BloomCreationEnabled(tenantID string) bool
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
BloomTaskMaxRetries(tenantID string) int

@ -17,6 +17,7 @@ import (
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/queue"
@ -254,7 +255,7 @@ func (p *Planner) runOne(ctx context.Context) error {
tables := p.tables(time.Now())
level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())
work, err := p.loadTenantWork(ctx, tables)
tenantTables, err := p.loadTenantTables(ctx, tables)
if err != nil {
return fmt.Errorf("error loading work: %w", err)
}
@ -265,19 +266,21 @@ func (p *Planner) runOne(ctx context.Context) error {
tasksResultForTenantTable := make(map[tenantTable]tenantTableTaskResults)
var totalTasks int
for table, tenants := range work {
for tenant, ownershipRanges := range tenants {
for table, tenants := range tenantTables {
for _, tenant := range tenants {
logger := log.With(p.logger, "tenant", tenant, "table", table.Addr())
tt := tenantTable{
tenant: tenant,
table: table,
}
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, ownershipRanges)
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "error computing tasks", "err", err)
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
}
level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas))
var tenantTableEnqueuedTasks int
@ -367,17 +370,20 @@ func (p *Planner) runOne(ctx context.Context) error {
return nil
}
// computeTasks computes the tasks for a given table and tenant and ownership range.
// It returns the tasks to be executed and the metas that are existing relevant for the ownership range.
// computeTasks computes the tasks for a given table and tenant.
// It returns the tasks to be executed and the existing metas.
func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
ownershipRanges []v1.FingerprintBounds,
) ([]*protos.Task, []bloomshipper.Meta, error) {
var tasks []*protos.Task
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
}
// Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms
metas, err := p.bloomStore.FetchMetas(
ctx,
@ -421,22 +427,9 @@ func (p *Planner) computeTasks(
}
}()
for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())
// Filter only the metas that overlap in the ownership range
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)
// Find gaps in the TSDBs for this tenant/table
gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
}
for _, gap := range gaps {
tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
}
tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}
return tasks, metas, nil
@ -649,15 +642,12 @@ func (p *Planner) tables(ts time.Time) *dayRangeIterator {
return newDayRangeIterator(fromDay, throughDay, p.schemaCfg)
}
type work map[config.DayTable]map[string][]v1.FingerprintBounds
// loadTenantWork loads the work for each tenant and table tuple.
// work is the list of fingerprint ranges that need to be indexed in bloom filters.
func (p *Planner) loadTenantWork(
// loadTenantTables loads all tenants with bloom build enabled for each table.
func (p *Planner) loadTenantTables(
ctx context.Context,
tables *dayRangeIterator,
) (work, error) {
tenantTableWork := make(map[config.DayTable]map[string][]v1.FingerprintBounds, tables.TotalDays())
) (map[config.DayTable][]string, error) {
tenantTables := make(map[config.DayTable][]string, tables.TotalDays())
for tables.Next() && tables.Err() == nil && ctx.Err() == nil {
table := tables.At()
@ -670,8 +660,8 @@ func (p *Planner) loadTenantWork(
level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Remaining())
// If this is the first this we see this table, initialize the map
if tenantTableWork[table] == nil {
tenantTableWork[table] = make(map[string][]v1.FingerprintBounds, tenants.Remaining())
if tenantTables[table] == nil {
tenantTables[table] = make([]string, tenants.Remaining())
}
for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
@ -683,11 +673,6 @@ func (p *Planner) loadTenantWork(
continue
}
splitFactor := p.limits.BloomSplitSeriesKeyspaceBy(tenant)
bounds := SplitFingerprintKeyspaceByFactor(splitFactor)
tenantTableWork[table][tenant] = bounds
// Reset progress tracking metrics for this tenant
// NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal.
// Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about.
@ -695,7 +680,7 @@ func (p *Planner) loadTenantWork(
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Set(0)
level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor)
tenantTables[table] = append(tenantTables[table], tenant)
}
if err := tenants.Err(); err != nil {
level.Error(p.logger).Log("msg", "error iterating tenants", "err", err)
@ -708,7 +693,7 @@ func (p *Planner) loadTenantWork(
return nil, fmt.Errorf("error iterating tables: %w", err)
}
return tenantTableWork, ctx.Err()
return tenantTables, ctx.Err()
}
func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.SliceIter[string], error) {
@ -720,178 +705,6 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
return iter.NewSliceIter(tenants), nil
}
// blockPlan is a plan for all the work needed to build a meta.json
// It includes:
// - the tsdb (source of truth) which contains all the series+chunks
// we need to ensure are indexed in bloom blocks
// - a list of gaps that are out of date and need to be checked+built
// - within each gap, a list of block refs which overlap the gap are included
// so we can use them to accelerate bloom generation. They likely contain many
// of the same chunks we need to ensure are indexed, just from previous tsdb iterations.
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.Gap
}
func (p *Planner) findOutdatedGaps(
ctx context.Context,
tenant string,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to find gaps", "err", err)
return nil, fmt.Errorf("failed to find gaps: %w", err)
}
if len(tsdbsWithGaps) == 0 {
level.Debug(logger).Log("msg", "blooms exist for all tsdbs")
return nil, nil
}
work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
}
return work, nil
}
// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
tsdb common.ClosableForSeries
gaps []v1.FingerprintBounds
}
// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for db, tsdb := range tsdbs {
id := db.Name()
relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
for _, meta := range metas {
for _, s := range meta.Sources {
if s.Name() == id {
relevantMetas = append(relevantMetas, meta.Bounds)
}
}
}
gaps, err := FindGapsInFingerprintBounds(ownershipRange, relevantMetas)
if err != nil {
return nil, err
}
if len(gaps) > 0 {
res = append(res, tsdbGaps{
tsdbIdentifier: db,
tsdb: tsdb,
gaps: gaps,
})
}
}
return res, err
}
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(
ctx context.Context,
tenant string,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))
for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdbIdentifier,
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}
for _, gap := range idx.gaps {
planGap := protos.Gap{
Bounds: gap,
}
seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}
for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
}
for _, block := range meta.Blocks {
if block.Bounds.Intersection(gap) == nil {
// this block doesn't overlap the gap, skip
continue
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.Blocks = append(planGap.Blocks, block)
}
}
// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.Blocks, func(i, j int) bool {
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})
peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef](
iter.NewSliceIter[bloomshipper.BlockRef](
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
iter.Identity[bloomshipper.BlockRef],
func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef {
return a
},
peekingBlocks,
)
deduped, err := iter.Collect[bloomshipper.BlockRef](itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
planGap.Blocks = deduped
plan.gaps = append(plan.gaps, planGap)
}
plans = append(plans, plan)
}
return plans, nil
}
func (p *Planner) addPendingTask(task *QueueTask) {
p.pendingTasks.Store(task.ID, task)
}

@ -16,12 +16,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
@ -33,7 +31,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
)
@ -64,110 +61,6 @@ func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.Bl
return m
}
func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
for _, tc := range []struct {
desc string
err bool
exp []tsdbGaps
ownershipRange v1.FingerprintBounds
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries
metas []bloomshipper.Meta
}{
{
desc: "non-overlapping tsdbs and metas",
err: true,
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
},
metas: []bloomshipper.Meta{
genMeta(11, 20, []int{0}, nil),
},
},
{
desc: "single tsdb",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
},
metas: []bloomshipper.Meta{
genMeta(4, 8, []int{0}, nil),
},
exp: []tsdbGaps{
{
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(0, 3),
v1.NewBounds(9, 10),
},
},
},
},
{
desc: "multiple tsdbs with separate blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
tsdbID(1): nil,
},
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, nil),
genMeta(6, 10, []int{1}, nil),
},
exp: []tsdbGaps{
{
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(6, 10),
},
},
{
tsdbIdentifier: tsdbID(1),
gaps: []v1.FingerprintBounds{
v1.NewBounds(0, 5),
},
},
},
},
{
desc: "multiple tsdbs with the same blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
tsdbID(1): nil,
},
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0, 1}, nil),
genMeta(6, 8, []int{1}, nil),
},
exp: []tsdbGaps{
{
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(6, 10),
},
},
{
tsdbIdentifier: tsdbID(1),
gaps: []v1.FingerprintBounds{
v1.NewBounds(9, 10),
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas)
if tc.err {
require.Error(t, err)
return
}
require.ElementsMatch(t, tc.exp, gaps)
})
}
}
func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
startTS, endTS := testDay.Bounds()
return bloomshipper.BlockRef{
@ -214,209 +107,6 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
}, nil
}
func Test_blockPlansForGaps(t *testing.T) {
for _, tc := range []struct {
desc string
ownershipRange v1.FingerprintBounds
tsdbs []tsdb.SingleTenantTSDBIdentifier
metas []bloomshipper.Meta
err bool
exp []blockPlan
}{
{
desc: "single overlapping meta+no overlapping block",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}),
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
},
},
},
},
},
{
desc: "single overlapping meta+one overlapping block",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}),
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)},
},
},
},
},
},
{
// the range which needs to be generated doesn't overlap with existing blocks
// from other tsdb versions since theres an up to date tsdb version block,
// but we can trim the range needing generation
desc: "trims up to date area",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
},
},
},
},
},
{
desc: "uses old block for overlapping range",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)},
},
},
},
},
},
{
desc: "multi case",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs
metas: []bloomshipper.Meta{
genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
}), // tsdb_0
genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0
genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1
genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
// tsdb (id=0) can source chunks from the blocks built from tsdb (id=1)
{
Bounds: v1.NewBounds(3, 5),
Series: genSeries(v1.NewBounds(3, 5)),
Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)},
},
{
Bounds: v1.NewBounds(9, 10),
Series: genSeries(v1.NewBounds(9, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)},
},
},
},
// tsdb (id=1) can source chunks from the blocks built from tsdb (id=0)
{
tsdb: tsdbID(1),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 2),
Series: genSeries(v1.NewBounds(0, 2)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
},
},
{
Bounds: v1.NewBounds(6, 7),
Series: genSeries(v1.NewBounds(6, 7)),
Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)},
},
},
},
},
},
{
desc: "dedupes block refs",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(9, 20),
}), // blocks for first diff tsdb
genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{
genBlockRef(5, 10),
genBlockRef(9, 20), // same block references in prior meta (will be deduped)
}), // block for second diff tsdb
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(5, 10),
genBlockRef(9, 20),
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
// We add series spanning the whole FP ownership range
tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries)
for _, id := range tc.tsdbs {
tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange))
}
// we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested
// separately and it's used to generate input in our regular code path (easier to write tests this way).
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tsdbs, tc.metas)
require.NoError(t, err)
plans, err := blockPlansForGaps(
context.Background(),
"fakeTenant",
gaps,
tc.metas,
)
if tc.err {
require.Error(t, err)
return
}
require.ElementsMatch(t, tc.exp, plans)
})
}
}
func genSeries(bounds v1.FingerprintBounds) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1))
for i := bounds.Min; i <= bounds.Max; i++ {
@ -434,45 +124,6 @@ func genSeries(bounds v1.FingerprintBounds) []*v1.Series {
return series
}
type fakeForSeries struct {
series []*v1.Series
}
func newFakeForSeries(series []*v1.Series) *fakeForSeries {
return &fakeForSeries{
series: series,
}
}
func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}
for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
})
}
if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}
func (f fakeForSeries) Close() error {
return nil
}
func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
tasks := make([]*QueueTask, 0, n)
// Enqueue tasks

@ -0,0 +1,45 @@
package strategies
import (
"context"
"fmt"
"github.com/go-kit/log"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)
const (
SplitKeyspaceStrategyName = "split_keyspace_by_factor"
)
type Limits interface {
BloomPlanningStrategy(tenantID string) string
BloomSplitSeriesKeyspaceBy(tenantID string) int
}
type TSDBSet = map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries
type PlanningStrategy interface {
// Plan returns a set of tasks for a given tenant-table tuple and TSDBs.
Plan(ctx context.Context, table config.DayTable, tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta) ([]*protos.Task, error)
}
func NewStrategy(
tenantID string,
limits Limits,
logger log.Logger,
) (PlanningStrategy, error) {
strategy := limits.BloomPlanningStrategy(tenantID)
switch strategy {
case SplitKeyspaceStrategyName:
return NewSplitKeyspaceStrategy(limits, logger)
default:
return nil, fmt.Errorf("unknown bloom planning strategy (%s)", strategy)
}
}

@ -0,0 +1,240 @@
package strategies
import (
"context"
"fmt"
"sort"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)
type SplitKeyspaceStrategy struct {
limits Limits
logger log.Logger
}
func NewSplitKeyspaceStrategy(
limits Limits,
logger log.Logger,
) (*SplitKeyspaceStrategy, error) {
return &SplitKeyspaceStrategy{
limits: limits,
logger: logger,
}, nil
}
func (s *SplitKeyspaceStrategy) Plan(
ctx context.Context,
table config.DayTable,
tenant string,
tsdbs TSDBSet,
metas []bloomshipper.Meta,
) ([]*protos.Task, error) {
splitFactor := s.limits.BloomSplitSeriesKeyspaceBy(tenant)
ownershipRanges := SplitFingerprintKeyspaceByFactor(splitFactor)
logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant)
level.Debug(s.logger).Log("msg", "loading work for tenant", "splitFactor", splitFactor)
var tasks []*protos.Task
for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())
// Filter only the metas that overlap in the ownership range
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)
// Find gaps in the TSDBs for this tenant/table
gaps, err := s.findOutdatedGaps(ctx, tenant, tsdbs, ownershipRange, metasInBounds, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
}
for _, gap := range gaps {
tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
}
}
return tasks, nil
}
// blockPlan is a plan for all the work needed to build a meta.json
// It includes:
// - the tsdb (source of truth) which contains all the series+chunks
// we need to ensure are indexed in bloom blocks
// - a list of gaps that are out of date and need to be checked+built
// - within each gap, a list of block refs which overlap the gap are included
// so we can use them to accelerate bloom generation. They likely contain many
// of the same chunks we need to ensure are indexed, just from previous tsdb iterations.
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.Gap
}
func (s *SplitKeyspaceStrategy) findOutdatedGaps(
ctx context.Context,
tenant string,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to find gaps", "err", err)
return nil, fmt.Errorf("failed to find gaps: %w", err)
}
if len(tsdbsWithGaps) == 0 {
level.Debug(logger).Log("msg", "blooms exist for all tsdbs")
return nil, nil
}
work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
}
return work, nil
}
// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
tsdb common.ClosableForSeries
gaps []v1.FingerprintBounds
}
// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for db, tsdb := range tsdbs {
id := db.Name()
relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
for _, meta := range metas {
for _, s := range meta.Sources {
if s.Name() == id {
relevantMetas = append(relevantMetas, meta.Bounds)
}
}
}
gaps, err := FindGapsInFingerprintBounds(ownershipRange, relevantMetas)
if err != nil {
return nil, err
}
if len(gaps) > 0 {
res = append(res, tsdbGaps{
tsdbIdentifier: db,
tsdb: tsdb,
gaps: gaps,
})
}
}
return res, err
}
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(
ctx context.Context,
tenant string,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))
for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdbIdentifier,
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}
for _, gap := range idx.gaps {
planGap := protos.Gap{
Bounds: gap,
}
seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}
for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
}
for _, block := range meta.Blocks {
if block.Bounds.Intersection(gap) == nil {
// this block doesn't overlap the gap, skip
continue
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.Blocks = append(planGap.Blocks, block)
}
}
// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.Blocks, func(i, j int) bool {
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})
peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef](
iter.NewSliceIter[bloomshipper.BlockRef](
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
iter.Identity[bloomshipper.BlockRef],
func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef {
return a
},
peekingBlocks,
)
deduped, err := iter.Collect[bloomshipper.BlockRef](itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
planGap.Blocks = deduped
plan.gaps = append(plan.gaps, planGap)
}
plans = append(plans, plan)
}
return plans, nil
}

@ -0,0 +1,432 @@
package strategies
import (
"context"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)
var testDay = parseDayTime("2023-09-01")
var testTable = config.NewDayTable(testDay, "index_")
func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
for _, tc := range []struct {
desc string
err bool
exp []tsdbGaps
ownershipRange v1.FingerprintBounds
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries
metas []bloomshipper.Meta
}{
{
desc: "non-overlapping tsdbs and metas",
err: true,
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
},
metas: []bloomshipper.Meta{
genMeta(11, 20, []int{0}, nil),
},
},
{
desc: "single tsdb",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
},
metas: []bloomshipper.Meta{
genMeta(4, 8, []int{0}, nil),
},
exp: []tsdbGaps{
{
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(0, 3),
v1.NewBounds(9, 10),
},
},
},
},
{
desc: "multiple tsdbs with separate blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
tsdbID(1): nil,
},
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, nil),
genMeta(6, 10, []int{1}, nil),
},
exp: []tsdbGaps{
{
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(6, 10),
},
},
{
tsdbIdentifier: tsdbID(1),
gaps: []v1.FingerprintBounds{
v1.NewBounds(0, 5),
},
},
},
},
{
desc: "multiple tsdbs with the same blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
tsdbID(1): nil,
},
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0, 1}, nil),
genMeta(6, 8, []int{1}, nil),
},
exp: []tsdbGaps{
{
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(6, 10),
},
},
{
tsdbIdentifier: tsdbID(1),
gaps: []v1.FingerprintBounds{
v1.NewBounds(9, 10),
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas)
if tc.err {
require.Error(t, err)
return
}
require.Equal(t, tc.exp, gaps)
})
}
}
func Test_blockPlansForGaps(t *testing.T) {
for _, tc := range []struct {
desc string
ownershipRange v1.FingerprintBounds
tsdbs []tsdb.SingleTenantTSDBIdentifier
metas []bloomshipper.Meta
err bool
exp []blockPlan
}{
{
desc: "single overlapping meta+no overlapping block",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}),
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
},
},
},
},
},
{
desc: "single overlapping meta+one overlapping block",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}),
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)},
},
},
},
},
},
{
// the range which needs to be generated doesn't overlap with existing blocks
// from other tsdb versions since theres an up to date tsdb version block,
// but we can trim the range needing generation
desc: "trims up to date area",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
},
},
},
},
},
{
desc: "uses old block for overlapping range",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)},
},
},
},
},
},
{
desc: "multi case",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs
metas: []bloomshipper.Meta{
genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
}), // tsdb_0
genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0
genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1
genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
// tsdb (id=0) can source chunks from the blocks built from tsdb (id=1)
{
Bounds: v1.NewBounds(3, 5),
Series: genSeries(v1.NewBounds(3, 5)),
Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)},
},
{
Bounds: v1.NewBounds(9, 10),
Series: genSeries(v1.NewBounds(9, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)},
},
},
},
// tsdb (id=1) can source chunks from the blocks built from tsdb (id=0)
{
tsdb: tsdbID(1),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 2),
Series: genSeries(v1.NewBounds(0, 2)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
},
},
{
Bounds: v1.NewBounds(6, 7),
Series: genSeries(v1.NewBounds(6, 7)),
Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)},
},
},
},
},
},
{
desc: "dedupes block refs",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(9, 20),
}), // blocks for first diff tsdb
genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{
genBlockRef(5, 10),
genBlockRef(9, 20), // same block references in prior meta (will be deduped)
}), // block for second diff tsdb
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(5, 10),
genBlockRef(9, 20),
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
// We add series spanning the whole FP ownership range
tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries)
for _, id := range tc.tsdbs {
tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange))
}
// we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested
// separately and it's used to generate input in our regular code path (easier to write tests this way).
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tsdbs, tc.metas)
require.NoError(t, err)
plans, err := blockPlansForGaps(
context.Background(),
"fakeTenant",
gaps,
tc.metas,
)
if tc.err {
require.Error(t, err)
return
}
require.Equal(t, tc.exp, plans)
})
}
}
func genSeries(bounds v1.FingerprintBounds) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1))
for i := bounds.Min; i <= bounds.Max; i++ {
series = append(series, &v1.Series{
Fingerprint: i,
Chunks: v1.ChunkRefs{
{
From: 0,
Through: 1,
Checksum: 1,
},
},
})
}
return series
}
func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta {
m := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: "fakeTenant",
TableName: testTable.Addr(),
Bounds: v1.NewBounds(min, max),
},
},
Blocks: blocks,
}
for _, source := range sources {
m.Sources = append(m.Sources, tsdbID(source))
}
return m
}
func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
startTS, endTS := testDay.Bounds()
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: "fakeTenant",
TableName: testTable.Addr(),
Bounds: v1.NewBounds(min, max),
StartTimestamp: startTS,
EndTimestamp: endTS,
Checksum: 0,
},
}
}
func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier {
return tsdb.SingleTenantTSDBIdentifier{
TS: time.Unix(int64(n), 0),
}
}
func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return config.DayTime{
Time: model.TimeFromUnix(t.Unix()),
}
}
type fakeForSeries struct {
series []*v1.Series
}
func newFakeForSeries(series []*v1.Series) *fakeForSeries {
return &fakeForSeries{
series: series,
}
}
func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}
for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
})
}
if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}
func (f fakeForSeries) Close() error {
return nil
}

@ -0,0 +1,125 @@
package strategies
import (
"fmt"
"math"
"github.com/prometheus/common/model"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
)
// SplitFingerprintKeyspaceByFactor splits the keyspace covered by model.Fingerprint into contiguous non-overlapping ranges.
func SplitFingerprintKeyspaceByFactor(factor int) []v1.FingerprintBounds {
if factor <= 0 {
return nil
}
bounds := make([]v1.FingerprintBounds, 0, factor)
// The keyspace of a Fingerprint is from 0 to max uint64.
keyspaceSize := uint64(math.MaxUint64)
// Calculate the size of each range.
rangeSize := keyspaceSize / uint64(factor)
for i := 0; i < factor; i++ {
// Calculate the start and end of the range.
start := uint64(i) * rangeSize
end := start + rangeSize - 1
// For the last range, make sure it ends at the end of the keyspace.
if i == factor-1 {
end = keyspaceSize
}
// Create a FingerprintBounds for the range and add it to the slice.
bounds = append(bounds, v1.FingerprintBounds{
Min: model.Fingerprint(start),
Max: model.Fingerprint(end),
})
}
return bounds
}
func FindGapsInFingerprintBounds(ownershipRange v1.FingerprintBounds, metas []v1.FingerprintBounds) (gaps []v1.FingerprintBounds, err error) {
if len(metas) == 0 {
return []v1.FingerprintBounds{ownershipRange}, nil
}
// turn the available metas into a list of non-overlapping metas
// for easier processing
var nonOverlapping []v1.FingerprintBounds
// First, we reduce the metas into a smaller set by combining overlaps. They must be sorted.
var cur *v1.FingerprintBounds
for i := 0; i < len(metas); i++ {
j := i + 1
// first iteration (i == 0), set the current meta
if cur == nil {
cur = &metas[i]
}
if j >= len(metas) {
// We've reached the end of the list. Add the last meta to the non-overlapping set.
nonOverlapping = append(nonOverlapping, *cur)
break
}
combined := cur.Union(metas[j])
if len(combined) == 1 {
// There was an overlap between the two tested ranges. Combine them and keep going.
cur = &combined[0]
continue
}
// There was no overlap between the two tested ranges. Add the first to the non-overlapping set.
// and keep the second for the next iteration.
nonOverlapping = append(nonOverlapping, combined[0])
cur = &combined[1]
}
// Now, detect gaps between the non-overlapping metas and the ownership range.
// The left bound of the ownership range will be adjusted as we go.
leftBound := ownershipRange.Min
for _, meta := range nonOverlapping {
clippedMeta := meta.Intersection(ownershipRange)
// should never happen as long as we are only combining metas
// that intersect with the ownership range
if clippedMeta == nil {
return nil, fmt.Errorf("meta is not within ownership range: %v", meta)
}
searchRange := ownershipRange.Slice(leftBound, clippedMeta.Max)
// update the left bound for the next iteration
// We do the max to prevent the max bound to overflow from MaxUInt64 to 0
leftBound = min(
max(clippedMeta.Max+1, clippedMeta.Max),
max(ownershipRange.Max+1, ownershipRange.Max),
)
// since we've already ensured that the meta is within the ownership range,
// we know the xor will be of length zero (when the meta is equal to the ownership range)
// or 1 (when the meta is a subset of the ownership range)
xors := searchRange.Unless(*clippedMeta)
if len(xors) == 0 {
// meta is equal to the ownership range. This means the meta
// covers this entire section of the ownership range.
continue
}
gaps = append(gaps, xors[0])
}
// If the leftBound is less than the ownership range max, and it's smaller than MaxUInt64,
// There is a gap between the last meta and the end of the ownership range.
// Note: we check `leftBound < math.MaxUint64` since in the loop above we clamp the
// leftBound to MaxUint64 to prevent an overflow to 0: `max(clippedMeta.Max+1, clippedMeta.Max)`
if leftBound < math.MaxUint64 && leftBound <= ownershipRange.Max {
gaps = append(gaps, v1.NewBounds(leftBound, ownershipRange.Max))
}
return gaps, nil
}

@ -208,6 +208,7 @@ type Limits struct {
BloomBuilderResponseTimeout time.Duration `yaml:"bloom_build_builder_response_timeout" json:"bloom_build_builder_response_timeout" category:"experimental"`
BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"`
BloomPlanningStrategy string `yaml:"bloom_planning_strategy" json:"bloom_planning_strategy" category:"experimental"`
BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"`
BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding" category:"experimental"`
@ -389,7 +390,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
)
f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.")
f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.")
f.StringVar(&l.BloomPlanningStrategy, "bloom-build.planning-strategy", "split_keyspace_by_factor", "Experimental. Bloom planning strategy to use in bloom creation. Can be one of: 'split_keyspace_by_factor'")
f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Only if `bloom-build.planning-strategy` is 'split'. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.")
f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.")
f.DurationVar(&l.BloomBuilderResponseTimeout, "bloom-build.builder-response-timeout", 0, "Experimental. Timeout for a builder to finish a task. If a builder does not respond within this time, it is considered failed and the task will be requeued. 0 disables the timeout.")
f.IntVar(&l.BloomBuildTaskMaxRetries, "bloom-build.task-max-retries", 3, "Experimental. Maximum number of retries for a failed task. If a task fails more than this number of times, it is considered failed and will not be retried. A value of 0 disables this limit.")
@ -996,6 +998,10 @@ func (o *Overrides) BloomCreationEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomCreationEnabled
}
func (o *Overrides) BloomPlanningStrategy(userID string) string {
return o.getOverridesForUser(userID).BloomPlanningStrategy
}
func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int {
return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy
}

Loading…
Cancel
Save