refactor: Limit processing of blocks to requested fingerprint ranges in bloom gateway (#11987)

This PR limits the amount of data being processed for a single multiplexed iteration to the union of the fingerprint bounds of its requests, instead of looking at blocks from the complete fingerprint range.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/12030/head
Christian Haudum 2 years ago committed by GitHub
parent 6e4a9f3bdd
commit b47f2bfbc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      pkg/bloomgateway/processor.go
  2. 8
      pkg/bloomgateway/worker.go
  3. 35
      pkg/storage/bloom/v1/bounds.go
  4. 129
      pkg/storage/bloom/v1/bounds_test.go

@ -29,9 +29,13 @@ type processor struct {
}
func (p *processor) run(ctx context.Context, tasks []Task) error {
return p.runWithBounds(ctx, tasks, v1.MultiFingerprintBounds{{Min: 0, Max: math.MaxUint64}})
}
func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error {
for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
err := p.processTasks(ctx, tenant, ts, bounds, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
@ -45,13 +49,13 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
return nil
}
func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces []v1.FingerprintBounds, tasks []Task) error {
func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces v1.MultiFingerprintBounds, tasks []Task) error {
minFpRange, maxFpRange := getFirstLast(keyspaces)
interval := bloomshipper.NewInterval(day.Bounds())
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: interval,
Keyspace: v1.FingerprintBounds{Min: minFpRange.Min, Max: maxFpRange.Max},
Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max),
}
metas, err := p.store.FetchMetas(ctx, metaSearch)
if err != nil {

@ -10,8 +10,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
@ -147,6 +149,7 @@ func (w *worker) running(_ context.Context) error {
w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items)))
tasks := make([]Task, 0, len(items))
var mb v1.MultiFingerprintBounds
for _, item := range items {
task, ok := item.(Task)
if !ok {
@ -157,10 +160,13 @@ func (w *worker) running(_ context.Context) error {
level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID)
w.pending.Delete(task.ID)
tasks = append(tasks, task)
first, last := getFirstLast(task.series)
mb = mb.Union(v1.NewBounds(model.Fingerprint(first.Fingerprint), model.Fingerprint(last.Fingerprint)))
}
start = time.Now()
err = p.run(taskCtx, tasks)
err = p.runWithBounds(taskCtx, tasks, mb)
if err != nil {
w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds())

@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"github.com/grafana/loki/pkg/util/encoding"
)
@ -169,6 +170,40 @@ func (b FingerprintBounds) Unless(target FingerprintBounds) (res []FingerprintBo
return res
}
type MultiFingerprintBounds []FingerprintBounds
func (mb MultiFingerprintBounds) Union(target FingerprintBounds) MultiFingerprintBounds {
if len(mb) == 0 {
return MultiFingerprintBounds{target}
}
if len(mb) == 1 {
return mb[0].Union(target)
}
mb = append(mb, target)
slices.SortFunc(mb, func(a, b FingerprintBounds) int {
if a.Less(b) {
return -1
} else if a.Equal(b) {
return 0
}
return 1
})
var union MultiFingerprintBounds
for i := 0; i < len(mb); i++ {
j := len(union) - 1 // index of last item of union
if j >= 0 && union[j].Max >= mb[i].Min-1 {
union[j] = NewBounds(union[j].Min, max(mb[i].Max, union[j].Max))
} else {
union = append(union, mb[i])
}
}
mb = union
return mb
}
// unused, but illustrative
type BoundedIter[V any] struct {
Iterator[V]

@ -129,3 +129,132 @@ func Test_FingerprintBounds_Unless(t *testing.T) {
}, NewBounds(5, 25).Unless(target))
assert.Nil(t, NewBounds(14, 15).Unless(target))
}
func Test_MultiFingerprintBounds(t *testing.T) {
for _, tc := range []struct {
desc string
mb MultiFingerprintBounds
target FingerprintBounds
exp MultiFingerprintBounds
}{
{
desc: "no elements",
mb: MultiFingerprintBounds{},
target: NewBounds(0, 9),
exp: MultiFingerprintBounds{
NewBounds(0, 9),
},
},
{
desc: "single element before",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
},
target: NewBounds(15, 19),
exp: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
},
{
desc: "single element after",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
},
target: NewBounds(0, 3),
exp: MultiFingerprintBounds{
NewBounds(0, 3),
NewBounds(5, 9),
},
},
{
desc: "single element overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
},
target: NewBounds(0, 14),
exp: MultiFingerprintBounds{
NewBounds(0, 14),
},
},
{
desc: "multiple elements single overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
target: NewBounds(0, 6),
exp: MultiFingerprintBounds{
NewBounds(0, 9),
NewBounds(15, 19),
},
},
{
desc: "multiple elements single overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
target: NewBounds(11, 25),
exp: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(11, 25),
},
},
{
desc: "multiple elements combining overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
target: NewBounds(9, 15),
exp: MultiFingerprintBounds{
NewBounds(5, 19),
},
},
{
desc: "combination",
mb: MultiFingerprintBounds{
NewBounds(0, 2),
NewBounds(5, 9),
NewBounds(15, 19),
NewBounds(25, 29),
},
target: NewBounds(9, 15),
exp: MultiFingerprintBounds{
NewBounds(0, 2),
NewBounds(5, 19),
NewBounds(25, 29),
},
},
{
desc: "overlapping ranges",
mb: MultiFingerprintBounds{
NewBounds(0, 6),
NewBounds(5, 15),
},
target: NewBounds(8, 10),
exp: MultiFingerprintBounds{
NewBounds(0, 15),
},
},
{
desc: "disjoint ranges and target is between",
mb: MultiFingerprintBounds{
NewBounds(0, 9),
NewBounds(30, 39),
},
target: NewBounds(15, 19),
exp: MultiFingerprintBounds{
NewBounds(0, 9),
NewBounds(15, 19),
NewBounds(30, 39),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
res := tc.mb.Union(tc.target)
assert.Equal(t, tc.exp, res)
})
}
}

Loading…
Cancel
Save