chore(blooms): Various minor code cleanups (#13332)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/13314/head^2
Christian Haudum 2 years ago committed by GitHub
parent 40ee766724
commit 3b0502ddd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      pkg/bloomgateway/client.go
  2. 6
      pkg/bloomgateway/client_test.go
  3. 2
      pkg/bloomgateway/multiplexing.go
  4. 26
      pkg/bloomgateway/processor.go
  5. 6
      pkg/bloomgateway/processor_test.go
  6. 2
      pkg/bloomgateway/util.go
  7. 37
      pkg/bloomgateway/util_test.go
  8. 8
      pkg/bloomgateway/worker.go

@ -4,7 +4,6 @@ import (
"context"
"flag"
"io"
"math"
"sort"
"github.com/go-kit/log"
@ -208,7 +207,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
return nil, nil
}
firstFp, lastFp := uint64(math.MaxUint64), uint64(0)
pos := make(map[string]int)
servers := make([]addrWithGroups, 0, len(blocks))
for _, blockWithSeries := range blocks {
@ -217,15 +215,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
return nil, errors.Wrapf(err, "server address for block: %s", blockWithSeries.block)
}
// min/max fingerprint needed for the cache locality score
first, last := getFirstLast(blockWithSeries.series)
if first.Fingerprint < firstFp {
firstFp = first.Fingerprint
}
if last.Fingerprint > lastFp {
lastFp = last.Fingerprint
}
if idx, found := pos[addr]; found {
servers[idx].groups = append(servers[idx].groups, blockWithSeries.series...)
servers[idx].blocks = append(servers[idx].blocks, blockWithSeries.block.String())

@ -46,17 +46,17 @@ func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {
func TestGatewayClient_MergeSeries(t *testing.T) {
inputs := [][]*logproto.GroupedChunkRefs{
// response 1
// response 1 -- sorted
{
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks
},
// response 2
// response 2 -- not sorted
{
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
},
}

@ -127,7 +127,7 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
interval: t.interval,
table: t.table,
ctx: t.ctx,
done: make(chan struct{}),
done: t.done,
}
}

@ -2,7 +2,6 @@ package bloomgateway
import (
"context"
"math"
"time"
"github.com/go-kit/log"
@ -35,21 +34,12 @@ type processor struct {
metrics *workerMetrics
}
func (p *processor) run(ctx context.Context, tasks []Task) error {
return p.runWithBounds(ctx, tasks, v1.MultiFingerprintBounds{{Min: 0, Max: math.MaxUint64}})
}
func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error {
func (p *processor) processTasks(ctx context.Context, tasks []Task) error {
tenant := tasks[0].tenant
level.Info(p.logger).Log(
"msg", "process tasks with bounds",
"tenant", tenant,
"tasks", len(tasks),
"bounds", len(bounds),
)
level.Info(p.logger).Log("msg", "process tasks", "tenant", tenant, "tasks", len(tasks))
for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
err := p.processTasks(ctx, tenant, ts, bounds, tasks)
err := p.processTasksForDay(ctx, tenant, ts, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
@ -63,7 +53,7 @@ func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.M
return nil
}
func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, _ v1.MultiFingerprintBounds, tasks []Task) error {
func (p *processor) processTasksForDay(ctx context.Context, tenant string, day config.DayTime, tasks []Task) error {
level.Info(p.logger).Log("msg", "process tasks for day", "tenant", tenant, "tasks", len(tasks), "day", day.String())
var duration time.Duration
@ -72,10 +62,10 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
blocksRefs = append(blocksRefs, task.blocks...)
}
data := partitionTasks(tasks, blocksRefs)
tasksByBlock := partitionTasksByBlock(tasks, blocksRefs)
refs := make([]bloomshipper.BlockRef, 0, len(data))
for _, block := range data {
refs := make([]bloomshipper.BlockRef, 0, len(tasksByBlock))
for _, block := range tasksByBlock {
refs = append(refs, block.ref)
}
@ -103,7 +93,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
}
startProcess := time.Now()
res := p.processBlocks(ctx, bqs, data)
res := p.processBlocks(ctx, bqs, tasksByBlock)
duration = time.Since(startProcess)
for _, t := range tasks {

@ -166,7 +166,7 @@ func TestProcessor(t *testing.T) {
}(tasks[i])
}
err := p.run(ctx, tasks)
err := p.processTasks(ctx, tasks)
wg.Wait()
require.NoError(t, err)
require.Equal(t, int64(0), results.Load())
@ -218,7 +218,7 @@ func TestProcessor(t *testing.T) {
}(tasks[i])
}
err := p.run(ctx, tasks)
err := p.processTasks(ctx, tasks)
wg.Wait()
require.NoError(t, err)
require.Equal(t, int64(len(swb.series)), results.Load())
@ -267,7 +267,7 @@ func TestProcessor(t *testing.T) {
}(tasks[i])
}
err := p.run(ctx, tasks)
err := p.processTasks(ctx, tasks)
wg.Wait()
require.Errorf(t, err, "store failed")
require.Equal(t, int64(0), results.Load())

@ -48,7 +48,7 @@ type blockWithTasks struct {
tasks []Task
}
func partitionTasks(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks {
func partitionTasksByBlock(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks {
result := make([]blockWithTasks, 0, len(blocks))
for _, block := range blocks {

@ -73,7 +73,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
}
}
func TestPartitionTasks(t *testing.T) {
func TestPartitionTasksByBlock(t *testing.T) {
t.Run("consecutive block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
@ -93,7 +93,7 @@ func TestPartitionTasks(t *testing.T) {
tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}
results := partitionTasks(tasks, bounds)
results := partitionTasksByBlock(tasks, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range
actualFingerprints := make([]*logproto.GroupedChunkRefs, 0, nSeries)
@ -128,7 +128,7 @@ func TestPartitionTasks(t *testing.T) {
task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}
results := partitionTasks([]Task{task}, bounds)
results := partitionTasksByBlock([]Task{task}, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range
for _, res := range results {
// ensure we have the right number of tasks per bound
@ -153,9 +153,38 @@ func TestPartitionTasks(t *testing.T) {
},
}
results := partitionTasks(tasks, bounds)
results := partitionTasksByBlock(tasks, bounds)
require.Len(t, results, 0)
})
t.Run("overlapping and unsorted block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
mkBlockRef(5, 14),
mkBlockRef(0, 9),
mkBlockRef(10, 19),
}
tasks := []Task{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 6},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 12},
},
},
}
expected := []blockWithTasks{
{ref: bounds[0], tasks: tasks}, // both tasks
{ref: bounds[1], tasks: tasks[:1]}, // first task
{ref: bounds[2], tasks: tasks[1:]}, // second task
}
results := partitionTasksByBlock(tasks, bounds)
require.Equal(t, expected, results)
})
}
func TestPartitionRequest(t *testing.T) {

@ -8,11 +8,9 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/queue"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)
@ -92,7 +90,6 @@ func (w *worker) running(_ context.Context) error {
w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items)))
tasks := make([]Task, 0, len(items))
var mb v1.MultiFingerprintBounds
for _, item := range items {
task, ok := item.(Task)
if !ok {
@ -104,13 +101,10 @@ func (w *worker) running(_ context.Context) error {
w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds())
FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime))
tasks = append(tasks, task)
first, last := getFirstLast(task.series)
mb = mb.Union(v1.NewBounds(model.Fingerprint(first.Fingerprint), model.Fingerprint(last.Fingerprint)))
}
start = time.Now()
err = p.runWithBounds(taskCtx, tasks, mb)
err = p.processTasks(taskCtx, tasks)
if err != nil {
w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds())

Loading…
Cancel
Save